diff --git a/.github/check_entitlements.sh b/.github/check_entitlements.sh index 89bddaa..ea98f1a 100755 --- a/.github/check_entitlements.sh +++ b/.github/check_entitlements.sh @@ -1,15 +1,11 @@ #!/bin/bash - # Derive additional environment variables TOKEN_URL="${OIDC_OP_TOKEN_ENDPOINT}" OTDF_HOST_AND_PORT="${OPENTDF_PLATFORM_HOST}" OTDF_CLIENT="${OPENTDF_CLIENT_ID}" OTDF_CLIENT_SECRET="${OPENTDF_CLIENT_SECRET}" -# Enable debug mode -DEBUG=1 - echo "🔧 Environment Configuration:" echo " TOKEN_URL: ${TOKEN_URL}" echo " OTDF_HOST_AND_PORT: ${OTDF_HOST_AND_PORT}" @@ -28,6 +24,8 @@ get_token() { echo "🔐 Getting access token..." BEARER=$( get_token | jq -r '.access_token' ) +# NOTE: It's always okay to print this token, because it will +# only be valid / available in dummy / dev scenarios [[ "${DEBUG:-}" == "1" ]] && echo "Got Access Token: ${BEARER}" echo "" diff --git a/.github/workflows/platform-integration-test.yaml b/.github/workflows/platform-integration-test.yaml index 23c2f25..f4d8420 100644 --- a/.github/workflows/platform-integration-test.yaml +++ b/.github/workflows/platform-integration-test.yaml @@ -163,8 +163,8 @@ jobs: OIDC_TOKEN_ENDPOINT: "http://localhost:8888/auth/realms/opentdf/protocol/openid-connect/token" OPENTDF_KAS_URL: "http://localhost:8080/kas" INSECURE_SKIP_VERIFY: "TRUE" - TEST_OPENTDF_ATTRIBUTE_1: "https://example.com/attr/attr1/value/value1" - TEST_OPENTDF_ATTRIBUTE_2: "https://example.com/attr/attr1/value/value2" + TEST_OPENTDF_ATTRIBUTE_1: "https://example.net/attr/attr1/value/value1" + TEST_OPENTDF_ATTRIBUTE_2: "https://example.com/attr/attr1/value/value1" run: | uv sync # Skip the tests marked "integration" @@ -180,8 +180,8 @@ jobs: OIDC_OP_TOKEN_ENDPOINT: "http://localhost:8888/auth/realms/opentdf/protocol/openid-connect/token" OPENTDF_KAS_URL: "http://localhost:8080/kas" INSECURE_SKIP_VERIFY: "TRUE" - TEST_OPENTDF_ATTRIBUTE_1: "https://example.com/attr/attr1/value/value1" - TEST_OPENTDF_ATTRIBUTE_2: "https://example.com/attr/attr1/value/value2" + TEST_OPENTDF_ATTRIBUTE_1: "https://example.net/attr/attr1/value/value1" + TEST_OPENTDF_ATTRIBUTE_2: "https://example.com/attr/attr1/value/value1" run: | # Run check_entitlements.sh ./.github/check_entitlements.sh diff --git a/.gitignore b/.gitignore index e0a62d0..993764d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,8 @@ # Edit at https://www.toptal.com/developers/gitignore?templates=python platform/ - +tests/integration/test_data/v4.2.2/*tdf +tests/integration/test_data/v4.3.1/*tdf ### Python ### # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index efbe70b..620ed7d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,29 +15,34 @@ exclude: | # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks# repos: - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v6.0.0 - hooks: - - id: check-yaml - - id: end-of-file-fixer - - id: trailing-whitespace - - repo: https://github.com/codespell-project/codespell - rev: v2.4.1 - hooks: - - id: codespell - args: ["--ignore-words-list", "b-long, otdf_python", "--skip=go.sum,otdf_python/"] + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/codespell-project/codespell + rev: v2.4.1 + hooks: + - id: codespell + args: + [ + "--ignore-words-list", + "b-long, otdf_python", + "--skip=uv.lock,otdf-python-proto/uv.lock", + ] - - repo: https://github.com/astral-sh/ruff-pre-commit - # Ruff version. - rev: v0.12.12 - hooks: - # Run the linter. - - id: ruff - # Run the formatter. - - id: ruff-format - - repo: https://github.com/compilerla/conventional-pre-commit - rev: v4.2.0 - hooks: - - id: conventional-pre-commit - stages: [commit-msg,post-rewrite] - args: [--verbose,--scopes="feat,fix,docs,style,test,chore,ci"] + - repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.12.12 + hooks: + # Run the linter. + - id: ruff-check + # Run the formatter. + - id: ruff-format + - repo: https://github.com/compilerla/conventional-pre-commit + rev: v4.2.0 + hooks: + - id: conventional-pre-commit + stages: [commit-msg] + args: [--verbose, --scopes="feat, fix, docs, style, test, chore, ci"] diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 2ba2825..fa58eef 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,4 +1,3 @@ { - ".": "0.3.2", - "otdf-python-proto": "0.3.2" + ".": "0.3.2" } diff --git a/conftest.py b/conftest.py index a9bc2ab..b75e5b8 100644 --- a/conftest.py +++ b/conftest.py @@ -5,10 +5,18 @@ loaded by pytest when running tests. """ +from pathlib import Path + import pytest + from tests.server_logs import log_server_logs_on_failure +@pytest.fixture(scope="session") +def project_root(request) -> Path: + return request.config.rootpath # Project root + + @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): """ @@ -43,7 +51,6 @@ def pytest_runtest_makereport(item, call): log_server_logs_on_failure(test_name) -# Optional: Add a fixture to manually collect logs @pytest.fixture def collect_server_logs(): """ diff --git a/pyproject.toml b/pyproject.toml index 2c9dac0..b73efbc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,8 @@ lint.select = [ "C4", # McCabe complexity "C90", + # isort + "I", # Performance-related rules "PERF", # Ruff's performance rules # Additional useful rules diff --git a/src/otdf_python/__init__.py b/src/otdf_python/__init__.py index df07890..f8dcd49 100644 --- a/src/otdf_python/__init__.py +++ b/src/otdf_python/__init__.py @@ -5,10 +5,10 @@ Provides both programmatic APIs and command-line interface for encryption and decryption. """ +from .cli import main as cli_main +from .config import KASInfo, NanoTDFConfig, TDFConfig from .sdk import SDK from .sdk_builder import SDKBuilder -from .config import TDFConfig, NanoTDFConfig, KASInfo -from .cli import main as cli_main __all__ = [ "SDK", diff --git a/src/otdf_python/aesgcm.py b/src/otdf_python/aesgcm.py index a7d7446..ced6427 100644 --- a/src/otdf_python/aesgcm.py +++ b/src/otdf_python/aesgcm.py @@ -1,6 +1,7 @@ -from cryptography.hazmat.primitives.ciphers.aead import AESGCM import os +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + class AesGcm: GCM_NONCE_LENGTH = 12 diff --git a/src/otdf_python/asym_crypto.py b/src/otdf_python/asym_crypto.py index 78e3f8d..932bfa1 100644 --- a/src/otdf_python/asym_crypto.py +++ b/src/otdf_python/asym_crypto.py @@ -2,10 +2,9 @@ Asymmetric encryption and decryption utilities for RSA keys in PEM format. """ -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa, padding -from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.x509 import load_pem_x509_certificate from .sdk_exceptions import SDKException diff --git a/src/otdf_python/asym_decryption.py b/src/otdf_python/asym_decryption.py index 2ea7611..af11414 100644 --- a/src/otdf_python/asym_decryption.py +++ b/src/otdf_python/asym_decryption.py @@ -1,9 +1,9 @@ -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.backends import default_backend import base64 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + from .sdk_exceptions import SDKException diff --git a/src/otdf_python/asym_encryption.py b/src/otdf_python/asym_encryption.py index e385817..7e5e27a 100644 --- a/src/otdf_python/asym_encryption.py +++ b/src/otdf_python/asym_encryption.py @@ -1,11 +1,11 @@ -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives import hashes -from cryptography.x509 import load_pem_x509_certificate -from cryptography.hazmat.backends import default_backend import base64 import re +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.x509 import load_pem_x509_certificate + from .sdk_exceptions import SDKException diff --git a/src/otdf_python/cli.py b/src/otdf_python/cli.py index a8df80b..9fecf7d 100644 --- a/src/otdf_python/cli.py +++ b/src/otdf_python/cli.py @@ -7,19 +7,19 @@ """ import argparse +import contextlib import json import logging import sys +from dataclasses import asdict from io import BytesIO from pathlib import Path -from otdf_python.sdk_builder import SDKBuilder +from otdf_python.config import KASInfo, NanoTDFConfig, TDFConfig from otdf_python.sdk import SDK -from otdf_python.config import TDFConfig, NanoTDFConfig, KASInfo -from otdf_python.tdf import TDFReaderConfig +from otdf_python.sdk_builder import SDKBuilder from otdf_python.sdk_exceptions import SDKException -import contextlib - +from otdf_python.tdf import TDFReaderConfig # Version - get from project metadata __version__ = "0.3.2" @@ -355,27 +355,21 @@ def cmd_inspect(args): # Validate input file input_path = validate_file_exists(args.file) - # For inspection, we don't need full authentication - # Create a minimal SDK for reading metadata + # For inspection, try to create authenticated SDK, but allow unauthenticated inspection too try: - builder = SDKBuilder() - if args.platform_url: - builder.set_platform_endpoint(args.platform_url) - - # For inspection, we may not need authentication depending on the TDF - if args.client_id and args.client_secret: - builder.client_secret(args.client_id, args.client_secret) - elif args.auth: - auth_parts = args.auth.split(":") - if len(auth_parts) == 2: - builder.client_secret(auth_parts[0], auth_parts[1]) - - if args.plaintext: - builder.use_insecure_plaintext_connection(True) - if args.insecure: - builder.use_insecure_skip_verify(True) - - sdk = builder.build() + try: + sdk = build_sdk(args) + except CLIError as auth_error: + # If authentication fails, create minimal SDK for basic inspection + logger.warning(f"Authentication failed, using minimal SDK: {auth_error}") + builder = SDKBuilder() + if args.platform_url: + builder.set_platform_endpoint(args.platform_url) + if hasattr(args, "plaintext") and args.plaintext: + builder.use_insecure_plaintext_connection(True) + if args.insecure: + builder.use_insecure_skip_verify(True) + sdk = builder.build() try: # Read encrypted file @@ -395,12 +389,12 @@ def cmd_inspect(args): try: data_attributes = [] # This would need to be implemented in the SDK inspection_result = { - "manifest": manifest, + "manifest": asdict(manifest), "dataAttributes": data_attributes, } except Exception as e: logger.warning(f"Could not retrieve data attributes: {e}") - inspection_result = {"manifest": manifest} + inspection_result = {"manifest": asdict(manifest)} print(json.dumps(inspection_result, indent=2, default=str)) else: diff --git a/src/otdf_python/config.py b/src/otdf_python/config.py index 0531458..646acec 100644 --- a/src/otdf_python/config.py +++ b/src/otdf_python/config.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field from enum import Enum -from urllib.parse import urlparse, urlunparse from typing import Any +from urllib.parse import urlparse, urlunparse class TDFFormat(Enum): diff --git a/src/otdf_python/crypto_utils.py b/src/otdf_python/crypto_utils.py index 2b80e79..b32a5e9 100644 --- a/src/otdf_python/crypto_utils.py +++ b/src/otdf_python/crypto_utils.py @@ -1,8 +1,9 @@ -import hmac import hashlib -from cryptography.hazmat.primitives.asymmetric import rsa, ec -from cryptography.hazmat.primitives import serialization +import hmac + from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec, rsa class CryptoUtils: diff --git a/src/otdf_python/dpop.py b/src/otdf_python/dpop.py index d27bc0f..c442a5e 100644 --- a/src/otdf_python/dpop.py +++ b/src/otdf_python/dpop.py @@ -2,9 +2,10 @@ DPoP (Demonstration of Proof-of-Possession) token generation utilities. """ -import time -import hashlib import base64 +import hashlib +import time + import jwt from .crypto_utils import CryptoUtils diff --git a/src/otdf_python/eckeypair.py b/src/otdf_python/eckeypair.py index f463abc..3dee0aa 100644 --- a/src/otdf_python/eckeypair.py +++ b/src/otdf_python/eckeypair.py @@ -1,14 +1,14 @@ +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.serialization import ( Encoding, - PublicFormat, - PrivateFormat, NoEncryption, + PrivateFormat, + PublicFormat, ) -from cryptography.hazmat.backends import default_backend -from cryptography.exceptions import InvalidSignature class ECKeyPair: diff --git a/src/otdf_python/header.py b/src/otdf_python/header.py index ceae3f6..df7186d 100644 --- a/src/otdf_python/header.py +++ b/src/otdf_python/header.py @@ -1,8 +1,8 @@ -from otdf_python.resource_locator import ResourceLocator +from otdf_python.constants import MAGIC_NUMBER_AND_VERSION from otdf_python.ecc_mode import ECCMode -from otdf_python.symmetric_and_payload_config import SymmetricAndPayloadConfig from otdf_python.policy_info import PolicyInfo -from otdf_python.constants import MAGIC_NUMBER_AND_VERSION +from otdf_python.resource_locator import ResourceLocator +from otdf_python.symmetric_and_payload_config import SymmetricAndPayloadConfig class Header: diff --git a/src/otdf_python/kas_client.py b/src/otdf_python/kas_client.py index 39bba64..43b3c6a 100644 --- a/src/otdf_python/kas_client.py +++ b/src/otdf_python/kas_client.py @@ -2,21 +2,22 @@ KASClient: Handles communication with the Key Access Service (KAS). """ -import time -import logging +import base64 import hashlib +import logging import secrets -import base64 +import time from base64 import b64decode from dataclasses import dataclass + import jwt -from .kas_key_cache import KASKeyCache -from .sdk_exceptions import SDKException -from .crypto_utils import CryptoUtils from .asym_decryption import AsymDecryption -from .key_type_constants import RSA_KEY_TYPE, EC_KEY_TYPE +from .crypto_utils import CryptoUtils from .kas_connect_rpc_client import KASConnectRPCClient +from .kas_key_cache import KASKeyCache +from .key_type_constants import EC_KEY_TYPE, RSA_KEY_TYPE +from .sdk_exceptions import SDKException @dataclass diff --git a/src/otdf_python/kas_connect_rpc_client.py b/src/otdf_python/kas_connect_rpc_client.py index c8b7319..3b39021 100644 --- a/src/otdf_python/kas_connect_rpc_client.py +++ b/src/otdf_python/kas_connect_rpc_client.py @@ -4,12 +4,13 @@ """ import logging -import urllib3 -from .sdk_exceptions import SDKException +import urllib3 from otdf_python_proto.kas import kas_pb2 from otdf_python_proto.kas.kas_pb2_connect import AccessServiceClient +from .sdk_exceptions import SDKException + class KASConnectRPCClient: """ diff --git a/src/otdf_python/manifest.py b/src/otdf_python/manifest.py index 1d771da..1ebbae3 100644 --- a/src/otdf_python/manifest.py +++ b/src/otdf_python/manifest.py @@ -1,6 +1,6 @@ -from dataclasses import dataclass, field, asdict -from typing import Any import json +from dataclasses import asdict, dataclass, field +from typing import Any @dataclass diff --git a/src/otdf_python/nanotdf.py b/src/otdf_python/nanotdf.py index 9e896fa..d8a063e 100644 --- a/src/otdf_python/nanotdf.py +++ b/src/otdf_python/nanotdf.py @@ -1,20 +1,22 @@ -from cryptography.hazmat.primitives.ciphers.aead import AESGCM -from otdf_python.asym_crypto import AsymDecryption +import hashlib +import json import secrets -from typing import BinaryIO from io import BytesIO +from typing import BinaryIO + +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + +from otdf_python.asym_crypto import AsymDecryption from otdf_python.collection_store import CollectionStore, NoOpCollectionStore -from otdf_python.policy_stub import NULL_POLICY_UUID -from otdf_python.sdk_exceptions import SDKException +from otdf_python.config import KASInfo, NanoTDFConfig from otdf_python.constants import MAGIC_NUMBER_AND_VERSION -from otdf_python.resource_locator import ResourceLocator -from otdf_python.policy_object import PolicyObject, PolicyBody, AttributeObject -from otdf_python.symmetric_and_payload_config import SymmetricAndPayloadConfig from otdf_python.ecc_mode import ECCMode -import json -import hashlib from otdf_python.policy_info import PolicyInfo -from otdf_python.config import NanoTDFConfig, KASInfo +from otdf_python.policy_object import AttributeObject, PolicyBody, PolicyObject +from otdf_python.policy_stub import NULL_POLICY_UUID +from otdf_python.resource_locator import ResourceLocator +from otdf_python.sdk_exceptions import SDKException +from otdf_python.symmetric_and_payload_config import SymmetricAndPayloadConfig class NanoTDFException(SDKException): @@ -54,7 +56,7 @@ def _create_policy_object(self, attributes: list[str]) -> PolicyObject: def _serialize_policy_object(self, obj): """Custom NanoTDF serializer to convert to compatible JSON format.""" - from otdf_python.policy_object import PolicyBody, AttributeObject + from otdf_python.policy_object import AttributeObject, PolicyBody if isinstance(obj, PolicyBody): # Convert data_attributes to dataAttributes and use null instead of empty array @@ -224,10 +226,9 @@ def _wrap_key_if_needed( break if kas_public_key: - from cryptography.hazmat.primitives import serialization - from cryptography.hazmat.primitives.asymmetric import padding - from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding public_key = serialization.load_pem_public_key( kas_public_key.encode(), backend=default_backend() diff --git a/src/otdf_python/sdk.py b/src/otdf_python/sdk.py index f69ec3d..407db14 100644 --- a/src/otdf_python/sdk.py +++ b/src/otdf_python/sdk.py @@ -2,14 +2,14 @@ Python port of the main SDK class for OpenTDF platform interaction. """ -from typing import Any, BinaryIO -from io import BytesIO from contextlib import AbstractContextManager +from io import BytesIO +from typing import Any, BinaryIO -from otdf_python.tdf import TDF, TDFReaderConfig, TDFReader +from otdf_python.config import NanoTDFConfig, TDFConfig from otdf_python.nanotdf import NanoTDF from otdf_python.sdk_exceptions import SDKException -from otdf_python.config import NanoTDFConfig, TDFConfig +from otdf_python.tdf import TDF, TDFReader, TDFReaderConfig # Stubs for service client interfaces (to be implemented) @@ -180,14 +180,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): class SDK(AbstractContextManager): def new_tdf_config( self, attributes: list[str] | None = None, **kwargs - ) -> "TDFConfig": + ) -> TDFConfig: """ Create a TDFConfig with default kas_info_list from the SDK's platform_url. - Based on Java implementation. """ - from otdf_python.config import TDFConfig, KASInfo + from otdf_python.config import KASInfo - platform_url = self.platform_url or "https://default.kas.example.com" + if self.platform_url is None: + raise SDKException("Cannot create TDFConfig: SDK platform_url is not set.") # Get use_plaintext setting - allow override via kwargs, fall back to SDK setting use_plaintext = kwargs.pop( @@ -198,7 +198,7 @@ def new_tdf_config( # Include explicit port for HTTPS to match otdfctl behavior from urllib.parse import urlparse - parsed_url = urlparse(platform_url) + parsed_url = urlparse(self.platform_url) # Determine scheme and default port based on use_plaintext setting if use_plaintext: diff --git a/src/otdf_python/sdk_builder.py b/src/otdf_python/sdk_builder.py index 87e34e0..ead42f5 100644 --- a/src/otdf_python/sdk_builder.py +++ b/src/otdf_python/sdk_builder.py @@ -3,14 +3,15 @@ Provides methods to configure and build SDK instances. """ -from typing import Any -import os import logging +import os import ssl -import httpx from dataclasses import dataclass +from typing import Any + +import httpx -from otdf_python.sdk import SDK, KAS +from otdf_python.sdk import KAS, SDK from otdf_python.sdk_exceptions import AutoConfigureException # Configure logging diff --git a/src/otdf_python/tdf.py b/src/otdf_python/tdf.py index 51428fb..3dd35af 100644 --- a/src/otdf_python/tdf.py +++ b/src/otdf_python/tdf.py @@ -1,31 +1,32 @@ -from typing import BinaryIO, TYPE_CHECKING -import io -import os +import base64 import hashlib import hmac -import base64 -import zipfile +import io import logging +import os +import zipfile +from typing import TYPE_CHECKING, BinaryIO if TYPE_CHECKING: from otdf_python.kas_client import KASClient +from dataclasses import dataclass + +from otdf_python.aesgcm import AesGcm +from otdf_python.config import TDFConfig +from otdf_python.key_type_constants import RSA_KEY_TYPE from otdf_python.manifest import ( Manifest, - ManifestSegment, - ManifestIntegrityInformation, - ManifestRootSignature, ManifestEncryptionInformation, - ManifestPayload, - ManifestMethod, + ManifestIntegrityInformation, ManifestKeyAccess, + ManifestMethod, + ManifestPayload, + ManifestRootSignature, + ManifestSegment, ) from otdf_python.policy_stub import NULL_POLICY_UUID from otdf_python.tdf_writer import TDFWriter -from otdf_python.aesgcm import AesGcm -from dataclasses import dataclass -from otdf_python.key_type_constants import RSA_KEY_TYPE -from otdf_python.config import TDFConfig @dataclass @@ -83,10 +84,11 @@ def _validate_kas_infos(self, kas_infos): return validated_kas_infos def _wrap_key_for_kas(self, key, kas_infos, policy_json=None): - from otdf_python.asym_crypto import AsymEncryption import hashlib import hmac + from otdf_python.asym_crypto import AsymEncryption + key_access_objs = [] for kas in kas_infos: asym = AsymEncryption(kas.public_key) @@ -161,7 +163,7 @@ def _build_policy_json(self, config: TDFConfig) -> str: def _serialize_policy_object(self, obj): """Custom TDF serializer to convert to compatible JSON format.""" - from otdf_python.policy_object import PolicyBody, AttributeObject + from otdf_python.policy_object import AttributeObject, PolicyBody if isinstance(obj, PolicyBody): # Convert data_attributes to dataAttributes and use null instead of empty array @@ -277,7 +279,7 @@ def create_tdf( self, payload: bytes | BinaryIO, config: TDFConfig, - output_stream: BinaryIO | None = None, + output_stream: io.BytesIO | None = None, ): if output_stream is None: output_stream = io.BytesIO() @@ -375,9 +377,14 @@ def create_tdf( size = writer.finish() return manifest, size, output_stream - def load_tdf(self, tdf_bytes: bytes, config: TDFReaderConfig) -> TDFReader: + def load_tdf( + self, tdf_data: bytes | io.BytesIO, config: TDFReaderConfig + ) -> TDFReader: # Extract manifest, unwrap payload key using KAS client - with zipfile.ZipFile(io.BytesIO(tdf_bytes), "r") as z: + # Handle both bytes and BinaryIO input + tdf_bytes_io = io.BytesIO(tdf_data) if isinstance(tdf_data, bytes) else tdf_data + + with zipfile.ZipFile(tdf_bytes_io, "r") as z: manifest_json = z.read("0.manifest.json").decode() manifest = Manifest.from_json(manifest_json) @@ -419,10 +426,11 @@ def read_payload( """ Reads and verifies TDF segments, decrypts if needed, and writes the payload to output_stream. """ + import base64 import zipfile + from otdf_python.aesgcm import AesGcm from otdf_python.asym_crypto import AsymDecryption - import base64 with zipfile.ZipFile(io.BytesIO(tdf_bytes), "r") as z: manifest_json = z.read("0.manifest.json").decode() diff --git a/src/otdf_python/tdf_reader.py b/src/otdf_python/tdf_reader.py index 8e797b7..a414f16 100644 --- a/src/otdf_python/tdf_reader.py +++ b/src/otdf_python/tdf_reader.py @@ -2,10 +2,10 @@ TDFReader is responsible for reading and processing Trusted Data Format (TDF) files. """ -from .zip_reader import ZipReader -from .sdk_exceptions import SDKException -from .policy_object import PolicyObject from .manifest import Manifest +from .policy_object import PolicyObject +from .sdk_exceptions import SDKException +from .zip_reader import ZipReader # Constants from TDFWriter TDF_MANIFEST_FILE_NAME = "0.manifest.json" @@ -119,9 +119,9 @@ def read_policy_object(self) -> PolicyObject: # Convert to PolicyObject from otdf_python.policy_object import ( - PolicyObject, - PolicyBody, AttributeObject, + PolicyBody, + PolicyObject, ) # Parse data attributes - handle case where body might be None or have None values diff --git a/src/otdf_python/tdf_writer.py b/src/otdf_python/tdf_writer.py index 93ad9a9..6dcd7d5 100644 --- a/src/otdf_python/tdf_writer.py +++ b/src/otdf_python/tdf_writer.py @@ -1,4 +1,5 @@ import io + from otdf_python.zip_writer import ZipWriter diff --git a/src/otdf_python/token_source.py b/src/otdf_python/token_source.py index f3610bc..0c60c3a 100644 --- a/src/otdf_python/token_source.py +++ b/src/otdf_python/token_source.py @@ -3,6 +3,7 @@ """ import time + import httpx diff --git a/src/otdf_python/zip_reader.py b/src/otdf_python/zip_reader.py index ddc8e82..78d7ecb 100644 --- a/src/otdf_python/zip_reader.py +++ b/src/otdf_python/zip_reader.py @@ -1,5 +1,6 @@ -import zipfile import io +import zipfile + from otdf_python.invalid_zip_exception import InvalidZipException diff --git a/src/otdf_python/zip_writer.py b/src/otdf_python/zip_writer.py index a96b551..e548d97 100644 --- a/src/otdf_python/zip_writer.py +++ b/src/otdf_python/zip_writer.py @@ -1,5 +1,5 @@ -import zipfile import io +import zipfile import zlib diff --git a/tests/config_pydantic.py b/tests/config_pydantic.py index 076a7f7..457f01c 100644 --- a/tests/config_pydantic.py +++ b/tests/config_pydantic.py @@ -14,8 +14,8 @@ """ -from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict class ConfigureTdf(BaseSettings): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 52c1030..88525ef 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -3,11 +3,16 @@ """ import json +import logging import tempfile from pathlib import Path import pytest +from tests.support_otdfctl_args import otdfctl_generate_tdf_files_for_target_mode + +logger = logging.getLogger(__name__) + @pytest.fixture(scope="session") def temp_credentials_file(): @@ -18,3 +23,52 @@ def temp_credentials_file(): with open(creds_file, "w") as f: json.dump(creds_data, f) yield creds_file + + +@pytest.fixture(scope="session") +def test_data_dir(): + """Get the path to the test data directory.""" + return Path(__file__).parent / "test_data" + + +@pytest.fixture(scope="session") +def sample_input_files(test_data_dir): + """Provide paths to sample input files for TDF generation.""" + return { + "text": test_data_dir / "sample_text.txt", + # "empty": test_data_dir / "empty_file.txt", + "binary": test_data_dir / "sample_binary.png", + "with_attributes": test_data_dir / "sample_with_attributes.txt", + } + + +@pytest.fixture(scope="session") +def tdf_v4_2_2_files(temp_credentials_file, test_data_dir, sample_input_files): + """Generate TDF files with target mode v4.2.2.""" + tdf_files = otdfctl_generate_tdf_files_for_target_mode( + "v4.2.2", temp_credentials_file, test_data_dir, sample_input_files + ) + yield tdf_files + + +@pytest.fixture(scope="session") +def tdf_v4_3_1_files(temp_credentials_file, test_data_dir, sample_input_files): + """Generate TDF files with target mode v4.3.1.""" + tdf_files = otdfctl_generate_tdf_files_for_target_mode( + "v4.3.1", temp_credentials_file, test_data_dir, sample_input_files + ) + yield tdf_files + + +@pytest.fixture(scope="session") +def all_target_mode_tdf_files(tdf_v4_2_2_files, tdf_v4_3_1_files): + """Combine all target mode TDF files into a single fixture.""" + return { + "v4.2.2": tdf_v4_2_2_files, + "v4.3.1": tdf_v4_3_1_files, + } + + +@pytest.fixture(scope="session") +def known_target_modes(): + return ["v4.2.2", "v4.3.1"] diff --git a/tests/integration/otdfctl_only/test_otdfctl_generated_fixtures.py b/tests/integration/otdfctl_only/test_otdfctl_generated_fixtures.py new file mode 100644 index 0000000..ccff47c --- /dev/null +++ b/tests/integration/otdfctl_only/test_otdfctl_generated_fixtures.py @@ -0,0 +1,113 @@ +import pytest + +from tests.support_common import validate_tdf3_file + + +@pytest.mark.integration +def test_test_data_directory_structure(tdf_v4_2_2_files, tdf_v4_3_1_files): + """Test that the TDF files are properly generated by fixtures.""" + + # Check v4.2.2 TDF files exist and are valid + expected_v4_2_2_files = ["text", "binary", "with_attributes"] + for file_key in expected_v4_2_2_files: + assert file_key in tdf_v4_2_2_files, ( + f"v4.2.2 TDF file key should exist: {file_key}" + ) + tdf_file_path = tdf_v4_2_2_files[file_key] + validate_tdf3_file( + tdf_file_path, f"otdfctl generated using target mode v4.2.2 {file_key}" + ) + + # Check v4.3.1 TDF files exist and are valid + expected_v4_3_1_files = ["text", "binary", "with_attributes"] + for file_key in expected_v4_3_1_files: + assert file_key in tdf_v4_3_1_files, ( + f"v4.3.1 TDF file key should exist: {file_key}" + ) + tdf_file_path = tdf_v4_3_1_files[file_key] + validate_tdf3_file( + tdf_file_path, f"otdfctl generated using target mode v4.3.1 {file_key}" + ) + + # Verify the TDF files are in the correct directory structure + for file_path in tdf_v4_2_2_files.values(): + assert "v4.2.2" in str(file_path), ( + f"v4.2.2 TDF file should be in v4.2.2 directory: {file_path}" + ) + + for file_path in tdf_v4_3_1_files.values(): + assert "v4.3.1" in str(file_path), ( + f"v4.3.1 TDF file should be in v4.3.1 directory: {file_path}" + ) + + +@pytest.mark.integration +def test_sample_file_contents(sample_input_files): + """Test that sample input files have expected content.""" + + # Check text file has content + text_file = sample_input_files["text"] + assert text_file.exists(), f"Text file should exist: {text_file}" + with open(text_file) as f: + content = f.read() + assert "Hello, World!" in content + assert len(content) > 0 + + # Check empty file is empty + # empty_file = sample_input_files["empty"] + # assert empty_file.exists(), f"Empty file should exist: {empty_file}" + # assert empty_file.stat().st_size == 0 + + # Check binary file exists and has content + binary_file = sample_input_files["binary"] + assert binary_file.exists(), f"Binary file should exist: {binary_file}" + assert binary_file.stat().st_size > 0 + + # Check attributes file has content + attr_file = sample_input_files["with_attributes"] + assert attr_file.exists(), f"Attributes file should exist: {attr_file}" + with open(attr_file) as f: + content = f.read() + assert "Classification: SECRET" in content + + +@pytest.mark.integration +def test_target_mode_fixtures_exist(all_target_mode_tdf_files, known_target_modes): + """Test that target mode fixtures generate TDF files correctly.""" + # Check that we have both versions + assert "v4.2.2" in all_target_mode_tdf_files + assert "v4.3.1" in all_target_mode_tdf_files + + # Check each version has the expected file types + for target_mode in known_target_modes: + tdf_files = all_target_mode_tdf_files[target_mode] + + # Check all expected file types exist + expected_types = [ + "text", + "binary", + "with_attributes", + ] # Consider 'empty' as well + for file_type in expected_types: + assert file_type in tdf_files, f"Missing {file_type} TDF for {target_mode}" + + # Check the TDF file exists and is not empty + tdf_path = tdf_files[file_type] + validate_tdf3_file( + tdf_path, + f"otdfctl generated using target-mode {target_mode} {file_type}", + ) + + +@pytest.mark.integration +def test_v4_2_2_tdf_files(tdf_v4_2_2_files): + """Test that v4.2.2 TDF fixtures work independently.""" + assert "text" in tdf_v4_2_2_files + assert tdf_v4_2_2_files["text"].exists() + + +@pytest.mark.integration +def test_v4_3_1_tdf_files(tdf_v4_3_1_files): + """Test that v4.3.1 TDF fixtures work independently.""" + assert "text" in tdf_v4_3_1_files + assert tdf_v4_3_1_files["text"].exists() diff --git a/tests/integration/otdfctl_to_python/test_cli_comparison.py b/tests/integration/otdfctl_to_python/test_cli_comparison.py new file mode 100644 index 0000000..d5d428a --- /dev/null +++ b/tests/integration/otdfctl_to_python/test_cli_comparison.py @@ -0,0 +1,178 @@ +""" +Test CLI functionality +""" + +import tempfile +from pathlib import Path + +import pytest + +from tests.support_cli_args import run_cli_decrypt +from tests.support_common import ( + handle_subprocess_error, + validate_plaintext_file_created, + validate_tdf3_file, +) +from tests.support_otdfctl_args import ( + run_otdfctl_decrypt_command, + run_otdfctl_encrypt_command, +) + + +@pytest.mark.integration +def test_otdfctl_encrypt_python_decrypt( + collect_server_logs, temp_credentials_file, project_root +): + """Integration test that uses otdfctl for encryption and the Python CLI for decryption""" + + # Create temporary directory for work + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create input file + input_file = temp_path / "input.txt" + input_content = "Hello, World! This is a test for otdfctl decrypt comparison." + with open(input_file, "w") as f: + f.write(input_content) + + # Define TDF file created by otdfctl + otdfctl_tdf_output = temp_path / "hello-world-otdfctl.txt.tdf" + + # Define decrypted outputs from both tools + otdfctl_decrypt_output = temp_path / "decrypted-by-otdfctl.txt" + cli_decrypt_output = temp_path / "decrypted-by-cli.txt" + + # Run otdfctl encrypt first to create a TDF file + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + cwd=temp_path, + ) + + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) + + validate_tdf3_file(otdfctl_tdf_output, "otdfctl") + + # Now run otdfctl decrypt (this is the reference implementation) + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, + cwd=temp_path, + ) + + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt", + ) + + # Run our Python CLI decrypt on the same TDF + cli_decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=otdfctl_tdf_output, + output_file=cli_decrypt_output, + cwd=project_root, + ) + + # Fail fast on errors + handle_subprocess_error( + result=cli_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt", + ) + + validate_plaintext_file_created( + path=otdfctl_decrypt_output, + scenario="otdfctl", + expected_content=input_content, + ) + validate_plaintext_file_created( + path=cli_decrypt_output, + scenario="Python CLI", + expected_content=input_content, + ) + + +@pytest.mark.integration +def test_otdfctl_encrypt_otdfctl_decrypt(collect_server_logs, temp_credentials_file): + """Integration test that uses otdfctl for both encryption and decryption to verify roundtrip functionality""" + + # Create temporary directory for work + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Create input file + input_file = temp_path / "input.txt" + input_content = ( + "Hello, World! This is a test for otdfctl roundtrip encryption/decryption." + ) + with open(input_file, "w") as f: + f.write(input_content) + + # Define TDF file and decrypted output + otdfctl_tdf_output = temp_path / "otdfctl-roundtrip.txt.tdf" + otdfctl_decrypt_output = temp_path / "otdfctl-roundtrip-decrypted.txt" + + # Run otdfctl encrypt + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", + cwd=temp_path, + ) + + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) + + # Verify the TDF file was created + validate_tdf3_file(tdf_path=otdfctl_tdf_output, tool_name="otdfctl") + + # Run otdfctl decrypt + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, + cwd=temp_path, + ) + + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt", + ) + + validate_plaintext_file_created( + path=otdfctl_decrypt_output, + scenario="otdfctl", + expected_content=input_content, + ) + + # Verify file sizes are reasonable + original_size = input_file.stat().st_size + tdf_size = otdfctl_tdf_output.stat().st_size + decrypted_size = otdfctl_decrypt_output.stat().st_size + + assert tdf_size > original_size, "TDF file should be larger than original" + + print( + f"✓ otdfctl roundtrip successful: {original_size} bytes -> {tdf_size} bytes -> {decrypted_size} bytes" + ) + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/integration/otdfctl_to_python/test_cli_decrypt.py b/tests/integration/otdfctl_to_python/test_cli_decrypt.py new file mode 100644 index 0000000..dc61123 --- /dev/null +++ b/tests/integration/otdfctl_to_python/test_cli_decrypt.py @@ -0,0 +1,191 @@ +""" +Tests using target mode fixtures, for CLI integration testing. +""" + +import logging +import subprocess +import tempfile +from pathlib import Path + +import pytest + +from tests.support_cli_args import ( + run_cli_decrypt, +) +from tests.support_common import handle_subprocess_error + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +def test_cli_decrypt_v4_2_2_vs_v4_3_1( + all_target_mode_tdf_files, temp_credentials_file, collect_server_logs, project_root +): + """ + Test Python CLI decrypt with various TDF versions created by otdfctl. + """ + + v4_2_2_files = all_target_mode_tdf_files["v4.2.2"] + v4_3_1_files = all_target_mode_tdf_files["v4.3.1"] + + # Test decrypt on both versions of the same file type + for file_type in ["text", "binary"]: + v4_2_2_tdf = v4_2_2_files[file_type] + v4_3_1_tdf = v4_3_1_files[file_type] + + # Decrypt v4.2.2 TDF + v4_2_2_output = _run_cli_decrypt( + v4_2_2_tdf, temp_credentials_file, project_root, collect_server_logs + ) + + # Decrypt v4.3.1 TDF + v4_3_1_output = _run_cli_decrypt( + v4_3_1_tdf, temp_credentials_file, project_root, collect_server_logs + ) + + # Both should succeed and produce output files + assert v4_2_2_output is not None, f"Failed to decrypt v4.2.2 {file_type} TDF" + assert v4_3_1_output is not None, f"Failed to decrypt v4.3.1 {file_type} TDF" + + assert v4_2_2_output.exists(), ( + f"v4.2.2 {file_type} decrypt output file not created" + ) + assert v4_3_1_output.exists(), ( + f"v4.3.1 {file_type} decrypt output file not created" + ) + + # Both output files should have content (not empty) + assert v4_2_2_output.stat().st_size > 0, ( + f"v4.2.2 {file_type} decrypt produced empty file" + ) + assert v4_3_1_output.stat().st_size > 0, ( + f"v4.3.1 {file_type} decrypt produced empty file" + ) + + # Log the decryption results for comparison + logger.info(f"\n=== {file_type.upper()} TDF Decryption Comparison ===") + logger.info(f"v4.2.2 output size: {v4_2_2_output.stat().st_size} bytes") + logger.info(f"v4.3.1 output size: {v4_3_1_output.stat().st_size} bytes") + + # For text files, we can compare the content directly + if file_type == "text": + v4_2_2_content = v4_2_2_output.read_text() + v4_3_1_content = v4_3_1_output.read_text() + + logger.info(f"v4.2.2 content preview: {v4_2_2_content[:50]}...") + logger.info(f"v4.3.1 content preview: {v4_3_1_content[:50]}...") + + # Clean up output files + v4_2_2_output.unlink() + v4_3_1_output.unlink() + + +@pytest.mark.integration +def test_cli_decrypt_different_file_types( + all_target_mode_tdf_files, + temp_credentials_file, + collect_server_logs, + project_root, + known_target_modes, +): + """ + Test CLI decrypt with different file types. + """ + + assert "v4.2.2" in all_target_mode_tdf_files + assert "v4.3.1" in all_target_mode_tdf_files + + # Check each version has the expected file types + for target_mode in known_target_modes: + tdf_files = all_target_mode_tdf_files[target_mode] + + file_types_to_test = [ + "text", + "binary", + "with_attributes", + ] # TODO: Consider adding "empty" file type as well + + for file_type in file_types_to_test: + tdf_path = tdf_files[file_type] + + # Decrypt the TDF + output_file = _run_cli_decrypt( + tdf_path, temp_credentials_file, project_root, collect_server_logs + ) + + assert output_file is not None, f"Failed to decrypt {file_type} TDF" + assert output_file.exists(), ( + f"{file_type} TDF decrypt output file not created" + ) + + # Check file-type specific expectations + if file_type == "empty": + # Empty files should produce empty output files + assert output_file.stat().st_size == 0, ( + f"{file_type} TDF should produce empty output" + ) + else: + # Non-empty files should produce non-empty output + assert output_file.stat().st_size > 0, ( + f"{file_type} TDF produced empty decrypt output" + ) + + # For attributed files, just ensure they decrypt successfully + if file_type == "with_attributes": + logger.info( + f"Successfully decrypted attributed TDF, output size: {output_file.stat().st_size}" + ) + + # For text files, verify the content is readable + if file_type == "text": + try: + content = output_file.read_text() + assert len(content) > 0, "Text file should have readable content" + logger.info(f"Text content preview: {content[:100]}...") + except UnicodeDecodeError: + pytest.fail(f"Decrypted {file_type} file should be valid text") + + # Clean up output file + output_file.unlink() + + +def _run_cli_decrypt( + tdf_path: Path, creds_file: Path, cwd: Path, collect_server_logs +) -> Path | None: + """ + Helper function to run Python CLI decrypt command and return the output file path. + + Returns the Path to the decrypted output file if successful, None if failed. + """ + # Create a temporary output file + with tempfile.NamedTemporaryFile(delete=False, suffix=".decrypted") as temp_file: + output_path = Path(temp_file.name) + + try: + # Build CLI command + cli_decrypt_result = run_cli_decrypt( + creds_file=creds_file, + input_file=tdf_path, + output_file=output_path, + cwd=cwd, + ) + + # Fail fast on errors + handle_subprocess_error( + result=cli_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt", + ) + + return output_path + + except subprocess.CalledProcessError as e: + logger.error(f"CLI decrypt failed for {tdf_path}: {e}") + logger.error(f"CLI stderr: {e.stderr}") + logger.error(f"CLI stdout: {e.stdout}") + + # Clean up the output file if it was created but command failed + if output_path.exists(): + output_path.unlink() + + raise Exception(f"Failed to decrypt TDF {tdf_path}: {e}") from e diff --git a/tests/integration/otdfctl_to_python/test_cli_inspect.py b/tests/integration/otdfctl_to_python/test_cli_inspect.py new file mode 100644 index 0000000..1ba39cf --- /dev/null +++ b/tests/integration/otdfctl_to_python/test_cli_inspect.py @@ -0,0 +1,122 @@ +""" +Tests using target mode fixtures, for CLI integration testing. +""" + +import logging + +import pytest + +from tests.support_cli_args import run_cli_inspect + +logger = logging.getLogger(__name__) + + +@pytest.mark.integration +def test_cli_inspect_v4_2_2_vs_v4_3_1( + all_target_mode_tdf_files, temp_credentials_file, project_root +): + """ + Test Python CLI inspect with various TDF versions created by otdfctl. + """ + + v4_2_2_files = all_target_mode_tdf_files["v4.2.2"] + v4_3_1_files = all_target_mode_tdf_files["v4.3.1"] + + # Test inspect on both versions of the same file type + for file_type in ["text", "binary"]: + v4_2_2_tdf = v4_2_2_files[file_type] + v4_3_1_tdf = v4_3_1_files[file_type] + + # Inspect v4.2.2 TDF + v4_2_2_result = run_cli_inspect(v4_2_2_tdf, temp_credentials_file, project_root) + + # Inspect v4.3.1 TDF + v4_3_1_result = run_cli_inspect(v4_3_1_tdf, temp_credentials_file, project_root) + + # Both should succeed + assert v4_2_2_result is not None, f"Failed to inspect v4.2.2 {file_type} TDF" + assert v4_3_1_result is not None, f"Failed to inspect v4.3.1 {file_type} TDF" + + # Both should have either manifest data (full inspection) or basic info (limited inspection) + if "manifest" in v4_2_2_result: + # Full inspection succeeded + assert "manifest" in v4_3_1_result, ( + f"v4.3.1 {file_type} inspection missing manifest while v4.2.2 has it" + ) + # Compare manifest versions (this is where version differences would show) + logger.info( + f"\n=== {file_type.upper()} TDF Comparison (Full Inspection) ===" + ) + logger.info( + f"v4.2.2 manifest keys: {list(v4_2_2_result['manifest'].keys())}" + ) + logger.info( + f"v4.3.1 manifest keys: {list(v4_3_1_result['manifest'].keys())}" + ) + else: + # Limited inspection - check for basic structure + assert "type" in v4_2_2_result, ( + f"v4.2.2 {file_type} inspection missing type" + ) + assert "size" in v4_2_2_result, ( + f"v4.2.2 {file_type} inspection missing size" + ) + assert "type" in v4_3_1_result, ( + f"v4.3.1 {file_type} inspection missing type" + ) + assert "size" in v4_3_1_result, ( + f"v4.3.1 {file_type} inspection missing size" + ) + + logger.info( + f"\n=== {file_type.upper()} TDF Comparison (Limited Inspection) ===" + ) + logger.info( + f"v4.2.2 type: {v4_2_2_result['type']}, size: {v4_2_2_result['size']}" + ) + logger.info( + f"v4.3.1 type: {v4_3_1_result['type']}, size: {v4_3_1_result['size']}" + ) + + +@pytest.mark.integration +def test_cli_inspect_different_file_types( + all_target_mode_tdf_files, temp_credentials_file, project_root, known_target_modes +): + """ + Test CLI inspect with different file types. + """ + assert "v4.2.2" in all_target_mode_tdf_files + assert "v4.3.1" in all_target_mode_tdf_files + + # Check each version has the expected file types + for target_mode in known_target_modes: + tdf_files = all_target_mode_tdf_files[target_mode] + + file_types_to_test = [ + "text", + "binary", + "with_attributes", + ] # TODO: Consider adding "empty" file type as well + + for file_type in file_types_to_test: + tdf_path = tdf_files[file_type] + + # Inspect the TDF + result = run_cli_inspect(tdf_path, temp_credentials_file, project_root) + + assert result is not None, ( + f"Failed to inspect {file_type} TDF, TDF version {target_mode}" + ) + assert "manifest" in result, f"{file_type} TDF inspection missing manifest" + + # Check file-type specific expectations + if file_type == "empty": + # Empty files should still have valid manifests + assert "encryptionInformation" in result["manifest"] + elif file_type == "with_attributes": + # Attributed files should have keyAccess information + assert ( + "keyAccess" in result["manifest"] + or "encryptionInformation" in result["manifest"] + ) diff --git a/tests/integration/test_tdf_reader_integration.py b/tests/integration/otdfctl_to_python/test_tdf_reader_integration.py similarity index 53% rename from tests/integration/test_tdf_reader_integration.py rename to tests/integration/otdfctl_to_python/test_tdf_reader_integration.py index af61986..5ff6118 100644 --- a/tests/integration/test_tdf_reader_integration.py +++ b/tests/integration/otdfctl_to_python/test_tdf_reader_integration.py @@ -4,27 +4,26 @@ import io import json -import pytest -import subprocess import tempfile from pathlib import Path +import pytest + from otdf_python.tdf_reader import ( TDFReader, ) from tests.config_pydantic import CONFIG_TDF - -# Fail fast if OPENTDF_PLATFORM_URL is not set -platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL -if not platform_url: - raise Exception("OPENTDF_PLATFORM_URL must be set in config for integration tests") +from tests.support_common import handle_subprocess_error +from tests.support_otdfctl_args import run_otdfctl_encrypt_command class TestTDFReaderIntegration: """Integration tests for TDFReader with real TDF files created by otdfctl.""" @pytest.mark.integration - def test_read_otdfctl_created_tdf_structure(self, temp_credentials_file): + def test_read_otdfctl_created_tdf_structure( + self, temp_credentials_file, collect_server_logs + ): """Test that TDFReader can parse the structure of files created by otdfctl.""" # Create temporary directory for work @@ -41,28 +40,20 @@ def test_read_otdfctl_created_tdf_structure(self, temp_credentials_file): otdfctl_output = temp_path / "test-reader.txt.tdf" # Run otdfctl encrypt - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_output), - ] - - otdfctl_result = subprocess.run( - otdfctl_cmd, capture_output=True, text=True, cwd=temp_path + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_output, + mime_type="text/plain", + cwd=temp_path, ) - # If otdfctl fails, skip the test (might be server issues) - if otdfctl_result.returncode != 0: - pytest.skip(f"otdfctl encrypt failed: {otdfctl_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) # Verify the TDF file was created assert otdfctl_output.exists(), "otdfctl did not create TDF file" @@ -113,7 +104,9 @@ def test_read_otdfctl_created_tdf_structure(self, temp_credentials_file): assert policy_obj is not None, "Should be able to read policy object" @pytest.mark.integration - def test_read_otdfctl_tdf_with_attributes(self, temp_credentials_file): + def test_read_otdfctl_tdf_with_attributes( + self, temp_credentials_file, collect_server_logs + ): """Test reading TDF files created by otdfctl with data attributes.""" # Create temporary directory for work @@ -130,37 +123,21 @@ def test_read_otdfctl_tdf_with_attributes(self, temp_credentials_file): otdfctl_output = temp_path / "input.txt.tdf" # Run otdfctl encrypt with attributes - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - "--attr", - CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1, - str(input_file), - "-o", - str(otdfctl_output), - ] - - otdfctl_result = subprocess.run( - otdfctl_cmd, capture_output=True, text=True, cwd=temp_path + otdfctl_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_output, + mime_type="text/plain", + attributes=[CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1], + cwd=temp_path, ) - # If otdfctl fails, skip the test - # assert otdfctl_result.returncode == 0, "otdfctl encrypt failed" - if otdfctl_result.returncode != 0: - print(f"otdfctl encrypt failed: {otdfctl_result.stderr}") - # Skip the test - pytest.skip( - f"otdfctl encrypt with attributes failed: {otdfctl_result.stderr}" - ) - else: - print("otdfctl encrypt with attributes succeeded") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt with attributest", + ) # Verify the TDF file was created assert otdfctl_output.exists(), "otdfctl did not create TDF file" @@ -200,7 +177,9 @@ def test_read_otdfctl_tdf_with_attributes(self, temp_credentials_file): ) @pytest.mark.integration - def test_read_multiple_otdfctl_files(self, temp_credentials_file): + def test_read_multiple_otdfctl_files( + self, temp_credentials_file, collect_server_logs + ): """Test reading multiple TDF files of different types created by otdfctl.""" # Create temporary directory for work @@ -226,88 +205,68 @@ def test_read_multiple_otdfctl_files(self, temp_credentials_file): }, ] - successful_tests = 0 - for test_case in test_cases: - try: - # Create input file - input_file = temp_path / f"{test_case['name']}.txt" - if isinstance(test_case["content"], bytes): - with open(input_file, "wb") as f: - f.write(test_case["content"]) - else: - with open(input_file, "w") as f: - f.write(test_case["content"]) - - # Define output file - output_file = temp_path / f"{test_case['name']}.tdf" - - # Run otdfctl encrypt - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - test_case["mime_type"], - str(input_file), - "-o", - str(output_file), - ] - - otdfctl_result = subprocess.run( - otdfctl_cmd, capture_output=True, text=True, cwd=temp_path - ) - - if otdfctl_result.returncode != 0: - continue # Skip this test case but don't fail the whole test + # Create input file + input_file = temp_path / f"{test_case['name']}.txt" + if isinstance(test_case["content"], bytes): + with open(input_file, "wb") as f: + f.write(test_case["content"]) + else: + with open(input_file, "w") as f: + f.write(test_case["content"]) + + # Define output file + output_file = temp_path / f"{test_case['name']}.tdf" + + # Run otdfctl encrypt + otdfctl_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=output_file, + mime_type=test_case["mime_type"], + cwd=temp_path, + ) - # Test TDFReader on this file - with open(output_file, "rb") as f: - tdf_data = f.read() + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_result, + collect_server_logs=collect_server_logs, + scenario_name=f"Test case {test_case['name']}, otdfctl encrypt", + ) - reader = TDFReader(io.BytesIO(tdf_data)) + # Test TDFReader on this file + with open(output_file, "rb") as f: + tdf_data = f.read() - # Basic structure verification - manifest_content = reader.manifest() - assert manifest_content, ( - f"Manifest should not be empty for {test_case['name']}" - ) + reader = TDFReader(io.BytesIO(tdf_data)) - manifest_json = json.loads(manifest_content) - assert "payload" in manifest_json, ( - f"Manifest should contain payload for {test_case['name']}" - ) + # Basic structure verification + manifest_content = reader.manifest() + assert manifest_content, ( + f"Manifest should not be empty for {test_case['name']}" + ) - # Verify MIME type is preserved - payload_info = manifest_json["payload"] - if "mimeType" in payload_info: - assert payload_info["mimeType"] == test_case["mime_type"], ( - f"MIME type should be preserved for {test_case['name']}" - ) - - # Test payload reading - payload_buffer = bytearray(1024) - bytes_read = reader.read_payload_bytes(payload_buffer) - assert bytes_read > 0, ( - f"Should read payload bytes for {test_case['name']}" - ) + manifest_json = json.loads(manifest_content) + assert "payload" in manifest_json, ( + f"Manifest should contain payload for {test_case['name']}" + ) - # Test policy object reading - policy_obj = reader.read_policy_object() - assert policy_obj is not None, ( - f"Should read policy object for {test_case['name']}" + # Verify MIME type is preserved + payload_info = manifest_json["payload"] + if "mimeType" in payload_info: + assert payload_info["mimeType"] == test_case["mime_type"], ( + f"MIME type should be preserved for {test_case['name']}" ) - successful_tests += 1 - - except Exception as e: - # Log the error but continue with other test cases - print(f"Test case {test_case['name']} failed: {e}") - continue + # Test payload reading + payload_buffer = bytearray(1024) + bytes_read = reader.read_payload_bytes(payload_buffer) + assert bytes_read > 0, ( + f"Should read payload bytes for {test_case['name']}" + ) - # Require at least one successful test to pass - assert successful_tests > 0, "At least one test case should succeed" + # Test policy object reading + policy_obj = reader.read_policy_object() + assert policy_obj is not None, ( + f"Should read policy object for {test_case['name']}" + ) diff --git a/tests/integration/test_kas_client_integration.py b/tests/integration/python_only/test_kas_client_integration.py similarity index 99% rename from tests/integration/test_kas_client_integration.py rename to tests/integration/python_only/test_kas_client_integration.py index c904437..97fc723 100644 --- a/tests/integration/test_kas_client_integration.py +++ b/tests/integration/python_only/test_kas_client_integration.py @@ -3,6 +3,7 @@ """ import pytest + from otdf_python.kas_client import KASClient, KeyAccess from otdf_python.kas_key_cache import KASKeyCache from otdf_python.sdk_exceptions import SDKException diff --git a/tests/integration/support_sdk.py b/tests/integration/support_sdk.py index 84dd1e1..0d93ba3 100644 --- a/tests/integration/support_sdk.py +++ b/tests/integration/support_sdk.py @@ -1,7 +1,8 @@ -from otdf_python.sdk_builder import SDKBuilder +import httpx + from otdf_python.sdk import SDK +from otdf_python.sdk_builder import SDKBuilder from tests.config_pydantic import CONFIG_TDF -import httpx def _get_sdk_builder() -> SDKBuilder: diff --git a/tests/integration/test_cli_comparison.py b/tests/integration/test_cli_comparison.py deleted file mode 100644 index 62c1f46..0000000 --- a/tests/integration/test_cli_comparison.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -Test CLI functionality -""" - -import pytest -import subprocess -import tempfile -from pathlib import Path -from tests.config_pydantic import CONFIG_TDF - -# Fail fast if OPENTDF_PLATFORM_URL is not set -platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL -if not platform_url: - raise Exception("OPENTDF_PLATFORM_URL must be set in config for integration tests") - - -@pytest.mark.integration -def test_otdfctl_encrypt_python_decrypt(collect_server_logs, temp_credentials_file): - """Integration test that uses otdfctl for encryption and the Python CLI for decryption""" - - # Create temporary directory for work - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - - # Create input file - input_file = temp_path / "input.txt" - input_content = "Hello, World! This is a test for otdfctl decrypt comparison." - with open(input_file, "w") as f: - f.write(input_content) - - # Define TDF file created by otdfctl - otdfctl_tdf_output = temp_path / "hello-world-otdfctl.txt.tdf" - - # Define decrypted outputs from both tools - otdfctl_decrypt_output = temp_path / "decrypted-by-otdfctl.txt" - cli_decrypt_output = temp_path / "decrypted-by-cli.txt" - - # Run otdfctl encrypt first to create a TDF file - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_encrypt_result = subprocess.run( - otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path - ) - - # If otdfctl encrypt fails, skip the test (might be server issues) - if otdfctl_encrypt_result.returncode != 0: - raise Exception(f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}") - - # Verify the TDF file was created - assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" - assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" - - # Now run otdfctl decrypt (this is the reference implementation) - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] - - otdfctl_decrypt_result = subprocess.run( - otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path - ) - - # Check that otdfctl decrypt succeeded - if otdfctl_decrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when otdfctl decrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in otdfctl_decrypt_result.stderr - or "token endpoint discovery" in otdfctl_decrypt_result.stderr - or "Issuer endpoint must be configured" in otdfctl_decrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {otdfctl_decrypt_result.stderr}" - ) - else: - assert otdfctl_decrypt_result.returncode == 0, ( - f"otdfctl decrypt failed: {otdfctl_decrypt_result.stderr}" - ) - - # Run our Python CLI decrypt on the same TDF - cli_decrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "decrypt", - str(otdfctl_tdf_output), - "-o", - str(cli_decrypt_output), - ] - - cli_decrypt_result = subprocess.run( - cli_decrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - ) - - # Check that our CLI succeeded - if cli_decrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when Python CLI decrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in cli_decrypt_result.stderr - or "token endpoint discovery" in cli_decrypt_result.stderr - or "Issuer endpoint must be configured" in cli_decrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {cli_decrypt_result.stderr}" - ) - else: - assert cli_decrypt_result.returncode == 0, ( - f"Python CLI decrypt failed: {cli_decrypt_result.stderr}" - ) - - # Verify both decrypted files were created - assert otdfctl_decrypt_output.exists(), "otdfctl did not create decrypted file" - assert otdfctl_decrypt_output.stat().st_size > 0, ( - "otdfctl created empty decrypted file" - ) - assert cli_decrypt_output.exists(), "Python CLI did not create decrypted file" - assert cli_decrypt_output.stat().st_size > 0, ( - "Python CLI created empty decrypted file" - ) - - # Verify both tools produce the same decrypted content - with open(otdfctl_decrypt_output) as f: - otdfctl_decrypted_content = f.read() - with open(cli_decrypt_output) as f: - cli_decrypted_content = f.read() - - # Both should match the original content - assert otdfctl_decrypted_content == input_content, ( - f"otdfctl decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{otdfctl_decrypted_content}'" - ) - assert cli_decrypted_content == input_content, ( - f"Python CLI decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{cli_decrypted_content}'" - ) - - # Both tools should produce identical results - assert otdfctl_decrypted_content == cli_decrypted_content, ( - f"Decrypted content differs between tools. " - f"otdfctl: '{otdfctl_decrypted_content}', Python CLI: '{cli_decrypted_content}'" - ) - - print( - "✓ Both otdfctl and Python CLI successfully decrypted the TDF with identical results" - ) - - -@pytest.mark.integration -def test_otdfctl_encrypt_otdfctl_decrypt(collect_server_logs, temp_credentials_file): - """Integration test that uses otdfctl for both encryption and decryption to verify roundtrip functionality""" - - # Create temporary directory for work - with tempfile.TemporaryDirectory() as temp_dir: - temp_path = Path(temp_dir) - - # Create input file - input_file = temp_path / "input.txt" - input_content = ( - "Hello, World! This is a test for otdfctl roundtrip encryption/decryption." - ) - with open(input_file, "w") as f: - f.write(input_content) - - # Define TDF file and decrypted output - otdfctl_tdf_output = temp_path / "otdfctl-roundtrip.txt.tdf" - otdfctl_decrypt_output = temp_path / "otdfctl-roundtrip-decrypted.txt" - - # Run otdfctl encrypt - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_encrypt_result = subprocess.run( - otdfctl_encrypt_cmd, capture_output=True, text=True, cwd=temp_path - ) - - # If otdfctl encrypt fails, skip the test (might be server issues) - if otdfctl_encrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when otdfctl encrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in otdfctl_encrypt_result.stderr - or "token endpoint discovery" in otdfctl_encrypt_result.stderr - or "Issuer endpoint must be configured" in otdfctl_encrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {otdfctl_encrypt_result.stderr}" - ) - else: - assert otdfctl_encrypt_result.returncode == 0, ( - f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}" - ) - - # Verify the TDF file was created - assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" - assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" - - # Verify TDF file has correct ZIP signature - with open(otdfctl_tdf_output, "rb") as f: - tdf_header = f.read(4) - assert tdf_header == b"PK\x03\x04", "otdfctl output is not a valid ZIP file" - - # Run otdfctl decrypt - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] - - otdfctl_decrypt_result = subprocess.run( - otdfctl_decrypt_cmd, capture_output=True, text=True, cwd=temp_path - ) - - # Check that otdfctl decrypt succeeded - if otdfctl_decrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when otdfctl decrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in otdfctl_decrypt_result.stderr - or "token endpoint discovery" in otdfctl_decrypt_result.stderr - or "Issuer endpoint must be configured" in otdfctl_decrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {otdfctl_decrypt_result.stderr}" - ) - else: - assert otdfctl_decrypt_result.returncode == 0, ( - f"otdfctl decrypt failed: {otdfctl_decrypt_result.stderr}" - ) - - # Verify the decrypted file was created - assert otdfctl_decrypt_output.exists(), "otdfctl did not create decrypted file" - assert otdfctl_decrypt_output.stat().st_size > 0, ( - "otdfctl created empty decrypted file" - ) - - # Verify the decrypted content matches the original - with open(otdfctl_decrypt_output) as f: - decrypted_content = f.read() - - assert decrypted_content == input_content, ( - f"otdfctl roundtrip failed - decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{decrypted_content}'" - ) - - # Verify file sizes are reasonable - original_size = input_file.stat().st_size - tdf_size = otdfctl_tdf_output.stat().st_size - decrypted_size = otdfctl_decrypt_output.stat().st_size - - assert tdf_size > original_size, "TDF file should be larger than original" - assert decrypted_size == original_size, ( - "Decrypted file should match original size" - ) - - print( - f"✓ otdfctl roundtrip successful: {original_size} bytes -> {tdf_size} bytes -> {decrypted_size} bytes" - ) - - -if __name__ == "__main__": - pytest.main([__file__]) diff --git a/tests/integration/test_cli_integration.py b/tests/integration/test_cli_integration.py index dbbbfc4..9201e8e 100644 --- a/tests/integration/test_cli_integration.py +++ b/tests/integration/test_cli_integration.py @@ -2,26 +2,28 @@ Integration Test CLI functionality """ -import pytest -import subprocess -import sys import tempfile from pathlib import Path -from tests.config_pydantic import CONFIG_TDF -import os - -original_env = os.environ.copy() -original_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" +import pytest -# Fail fast if OPENTDF_PLATFORM_URL is not set -platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL -if not platform_url: - raise Exception("OPENTDF_PLATFORM_URL must be set in config for integration tests") +from tests.support_cli_args import run_cli_decrypt, run_cli_encrypt +from tests.support_common import ( + compare_tdf3_file_size, + handle_subprocess_error, + validate_plaintext_file_created, + validate_tdf3_file, +) +from tests.support_otdfctl_args import ( + run_otdfctl_decrypt_command, + run_otdfctl_encrypt_command, +) @pytest.mark.integration -def test_cli_decrypt_otdfctl_tdf(temp_credentials_file): +def test_cli_decrypt_otdfctl_tdf( + collect_server_logs, temp_credentials_file, project_root +): """ Test that the Python CLI can successfully decrypt TDF files created by otdfctl. """ @@ -43,84 +45,49 @@ def test_cli_decrypt_otdfctl_tdf(temp_credentials_file): cli_decrypt_output = temp_path / "decrypted-by-cli.txt" # Run otdfctl encrypt - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_result = subprocess.run( - otdfctl_encrypt_cmd, - capture_output=True, - text=True, + otdfctl_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", cwd=temp_path, - env=original_env, ) - # If otdfctl fails, skip the test (might be server issues) - if otdfctl_result.returncode != 0: - raise Exception(f"otdfctl encrypt failed: {otdfctl_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) - # Verify the TDF file was created - assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" - assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" + validate_tdf3_file(otdfctl_tdf_output, "otdfctl") # Run our Python CLI decrypt on the otdfctl-created TDF - cli_decrypt_cmd = [ - sys.executable, - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "decrypt", - str(otdfctl_tdf_output), - "-o", - str(cli_decrypt_output), - ] - - cli_decrypt_result = subprocess.run( - cli_decrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, - ) - - # Check that our CLI succeeded - assert cli_decrypt_result.returncode == 0, ( - f"Python CLI decrypt failed: {cli_decrypt_result.stderr}" + cli_decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=otdfctl_tdf_output, + output_file=cli_decrypt_output, + cwd=project_root, ) - # Verify the decrypted file was created - assert cli_decrypt_output.exists(), "Python CLI did not create decrypted file" - assert cli_decrypt_output.stat().st_size > 0, ( - "Python CLI created empty decrypted file" + # Fail fast on errors + handle_subprocess_error( + result=cli_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt", ) - # Verify the content matches the original - with open(cli_decrypt_output) as f: - decrypted_content = f.read() - - assert decrypted_content == input_content, ( - f"Decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{decrypted_content}'" + validate_plaintext_file_created( + path=cli_decrypt_output, + scenario="Python decrypt", + expected_content=input_content, ) @pytest.mark.integration -def test_otdfctl_decrypt_comparison(collect_server_logs, temp_credentials_file): +def test_otdfctl_decrypt_comparison( + collect_server_logs, temp_credentials_file, project_root +): """ Test comparative decryption between otdfctl and Python CLI on the same TDF. """ @@ -143,142 +110,61 @@ def test_otdfctl_decrypt_comparison(collect_server_logs, temp_credentials_file): cli_decrypt_output = temp_path / "decrypted-by-cli.txt" # Run otdfctl encrypt first to create a TDF file - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_encrypt_result = subprocess.run( - otdfctl_encrypt_cmd, - capture_output=True, - text=True, + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", cwd=temp_path, - env=original_env, ) - # If otdfctl encrypt fails, skip the test (might be server issues) - if otdfctl_encrypt_result.returncode != 0: - raise Exception(f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) - # Verify the TDF file was created - assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" - assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" + validate_tdf3_file(otdfctl_tdf_output, "otdfctl") # Now run otdfctl decrypt (this is the reference implementation) - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] - - otdfctl_decrypt_result = subprocess.run( - otdfctl_decrypt_cmd, - capture_output=True, - text=True, + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, cwd=temp_path, - env=original_env, ) - # Check that otdfctl decrypt succeeded - assert otdfctl_decrypt_result.returncode == 0, ( - f"otdfctl decrypt failed: {otdfctl_decrypt_result.stderr}" + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt", ) - # Run our Python CLI decrypt on the same TDF - cli_decrypt_cmd = [ - sys.executable, - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "decrypt", - str(otdfctl_tdf_output), - "-o", - str(cli_decrypt_output), - ] - - cli_decrypt_result = subprocess.run( - cli_decrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, + cli_decrypt_result = run_cli_decrypt( + creds_file=temp_credentials_file, + input_file=otdfctl_tdf_output, + output_file=cli_decrypt_output, + cwd=project_root, ) - # Check that our CLI succeeded - if cli_decrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when Python CLI decrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in cli_decrypt_result.stderr - or "token endpoint discovery" in cli_decrypt_result.stderr - or "Issuer endpoint must be configured" in cli_decrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {cli_decrypt_result.stderr}" - ) - else: - assert cli_decrypt_result.returncode == 0, ( - f"Python CLI decrypt failed: {cli_decrypt_result.stderr}" - ) - - # Verify both decrypted files were created - assert otdfctl_decrypt_output.exists(), "otdfctl did not create decrypted file" - assert otdfctl_decrypt_output.stat().st_size > 0, ( - "otdfctl created empty decrypted file" - ) - assert cli_decrypt_output.exists(), "Python CLI did not create decrypted file" - assert cli_decrypt_output.stat().st_size > 0, ( - "Python CLI created empty decrypted file" + # Fail fast on errors + handle_subprocess_error( + result=cli_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI decrypt", ) - # Verify both tools produce the same decrypted content - with open(otdfctl_decrypt_output) as f: - otdfctl_decrypted_content = f.read() - with open(cli_decrypt_output) as f: - cli_decrypted_content = f.read() - - # Both should match the original content - assert otdfctl_decrypted_content == input_content, ( - f"otdfctl decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{otdfctl_decrypted_content}'" + validate_plaintext_file_created( + path=otdfctl_decrypt_output, + scenario="otdfctl", + expected_content=input_content, ) - assert cli_decrypted_content == input_content, ( - f"Python CLI decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{cli_decrypted_content}'" - ) - - # Both tools should produce identical results - assert otdfctl_decrypted_content == cli_decrypted_content, ( - f"Decrypted content differs between tools. " - f"otdfctl: '{otdfctl_decrypted_content}', Python CLI: '{cli_decrypted_content}'" - ) - - print( - "✓ Both otdfctl and Python CLI successfully decrypted the TDF with identical results" + validate_plaintext_file_created( + path=cli_decrypt_output, + scenario="Python CLI", + expected_content=input_content, ) @@ -305,113 +191,43 @@ def test_otdfctl_encrypt_decrypt_roundtrip(collect_server_logs, temp_credentials otdfctl_decrypt_output = temp_path / "otdfctl-roundtrip-decrypted.txt" # Run otdfctl encrypt - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_encrypt_result = subprocess.run( - otdfctl_encrypt_cmd, - capture_output=True, - text=True, + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", cwd=temp_path, - env=original_env, ) - # If otdfctl encrypt fails, skip the test (might be server issues) - if otdfctl_encrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when otdfctl encrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in otdfctl_encrypt_result.stderr - or "token endpoint discovery" in otdfctl_encrypt_result.stderr - or "Issuer endpoint must be configured" in otdfctl_encrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {otdfctl_encrypt_result.stderr}" - ) - else: - assert otdfctl_encrypt_result.returncode == 0, ( - f"otdfctl encrypt failed: {otdfctl_encrypt_result.stderr}" - ) + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_encrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) # Verify the TDF file was created - assert otdfctl_tdf_output.exists(), "otdfctl did not create TDF file" - assert otdfctl_tdf_output.stat().st_size > 0, "otdfctl created empty TDF file" - - # Verify TDF file has correct ZIP signature - with open(otdfctl_tdf_output, "rb") as f: - tdf_header = f.read(4) - assert tdf_header == b"PK\x03\x04", "otdfctl output is not a valid ZIP file" + validate_tdf3_file(otdfctl_tdf_output, "otdfctl") # Run otdfctl decrypt - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - str(otdfctl_tdf_output), - "-o", - str(otdfctl_decrypt_output), - ] - - otdfctl_decrypt_result = subprocess.run( - otdfctl_decrypt_cmd, - capture_output=True, - text=True, + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + temp_credentials_file, + otdfctl_tdf_output, + otdfctl_decrypt_output, cwd=temp_path, - env=original_env, ) - # Check that otdfctl decrypt succeeded - if otdfctl_decrypt_result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when otdfctl decrypt failed:\n{logs}") - - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in otdfctl_decrypt_result.stderr - or "token endpoint discovery" in otdfctl_decrypt_result.stderr - or "Issuer endpoint must be configured" in otdfctl_decrypt_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {otdfctl_decrypt_result.stderr}" - ) - else: - assert otdfctl_decrypt_result.returncode == 0, ( - f"otdfctl decrypt failed: {otdfctl_decrypt_result.stderr}" - ) - - # Verify the decrypted file was created - assert otdfctl_decrypt_output.exists(), "otdfctl did not create decrypted file" - assert otdfctl_decrypt_output.stat().st_size > 0, ( - "otdfctl created empty decrypted file" + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_decrypt_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl decrypt", ) - # Verify the decrypted content matches the original - with open(otdfctl_decrypt_output) as f: - decrypted_content = f.read() - - assert decrypted_content == input_content, ( - f"otdfctl roundtrip failed - decrypted content does not match original. " - f"Expected: '{input_content}', Got: '{decrypted_content}'" + validate_plaintext_file_created( + path=otdfctl_decrypt_output, + scenario="otdfctl", + expected_content=input_content, ) # Verify file sizes are reasonable @@ -430,14 +246,10 @@ def test_otdfctl_encrypt_decrypt_roundtrip(collect_server_logs, temp_credentials @pytest.mark.integration -def test_cli_encrypt_integration(temp_credentials_file): +def test_cli_encrypt_integration( + collect_server_logs, temp_credentials_file, project_root +): """Integration test comparing our CLI with otdfctl""" - # Skip if OPENTDF_PLATFORM_URL is not set - platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL - if not platform_url: - raise Exception( - "OPENTDF_PLATFORM_URL must be set in config for integration tests" - ) # Create temporary directory for work with tempfile.TemporaryDirectory() as temp_dir: @@ -454,93 +266,39 @@ def test_cli_encrypt_integration(temp_credentials_file): cli_output = temp_path / "hello-world-cli.txt.tdf" # Run otdfctl encrypt - otdfctl_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--tls-no-verify", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_output), - ] - - otdfctl_result = subprocess.run( - otdfctl_cmd, capture_output=True, text=True, cwd=temp_path + otdfctl_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_output, + mime_type="text/plain", + cwd=temp_path, ) - # If otdfctl fails, skip the test (might be server issues) - if otdfctl_result.returncode != 0: - raise Exception(f"otdfctl failed: {otdfctl_result.stderr}") + # Fail fast on errors + handle_subprocess_error( + result=otdfctl_result, + collect_server_logs=collect_server_logs, + scenario_name="otdfctl encrypt", + ) # Run our Python CLI encrypt - cli_cmd = [ - sys.executable, - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - "--insecure", # equivalent to --tls-no-verify - "encrypt", - "--mime-type", - "text/plain", - "--container-type", - "tdf", # to match otdfctl behavior - str(input_file), - "-o", - str(cli_output), - ] - - cli_result = subprocess.run( - cli_cmd, capture_output=True, text=True, cwd=Path(__file__).parent.parent + cli_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=cli_output, + mime_type="text/plain", + attributes=None, + cwd=project_root, ) - # Check that our CLI succeeded - if cli_result.returncode != 0: - # Check if this is a server connectivity issue - if ( - "401 Unauthorized" in cli_result.stderr - or "token endpoint discovery" in cli_result.stderr - or "Issuer endpoint must be configured" in cli_result.stderr - ): - pytest.skip( - f"Server connectivity or authentication issue: {cli_result.stderr}" - ) - - else: - assert cli_result.returncode == 0, ( - f"Python CLI failed: {cli_result.stderr}" - ) - - # Both output files should exist - assert otdfctl_output.exists(), "otdfctl output file does not exist" - assert cli_output.exists(), "Python CLI output file does not exist" - - # Both files should be non-empty and similar in size - otdfctl_size = otdfctl_output.stat().st_size - cli_size = cli_output.stat().st_size - - assert otdfctl_size > 0, "otdfctl output is empty" - assert cli_size > 0, "Python CLI output is empty" - - # Files should be reasonably similar in size (within 50% of each other) - # This accounts for potential differences in metadata or formatting - size_diff_ratio = abs(otdfctl_size - cli_size) / max(otdfctl_size, cli_size) - assert size_diff_ratio < 0.3, ( - f"File sizes too different: otdfctl={otdfctl_size}, cli={cli_size}" + # Fail fast on errors + handle_subprocess_error( + result=cli_result, + collect_server_logs=collect_server_logs, + scenario_name="Python CLI encrypt", ) - # Both files should start with ZIP signature (TDF format) - with open(otdfctl_output, "rb") as f: - otdfctl_header = f.read(4) - with open(cli_output, "rb") as f: - cli_header = f.read(4) + validate_tdf3_file(otdfctl_output, "otdfctl") + validate_tdf3_file(cli_output, "Python CLI") - assert otdfctl_header == b"PK\x03\x04", "otdfctl output is not a valid ZIP file" - assert cli_header == b"PK\x03\x04", "Python CLI output is not a valid ZIP file" + compare_tdf3_file_size(otdfctl_output, cli_output) diff --git a/tests/integration/test_cli_tdf_validation.py b/tests/integration/test_cli_tdf_validation.py index f500aaa..ff6dfd5 100644 --- a/tests/integration/test_cli_tdf_validation.py +++ b/tests/integration/test_cli_tdf_validation.py @@ -2,35 +2,27 @@ Test CLI encryption functionality and TDF validation """ -from otdf_python.tdf_reader import TDF_MANIFEST_FILE_NAME, TDF_PAYLOAD_FILE_NAME -import pytest -import subprocess -import tempfile import json -from pathlib import Path -from tests.config_pydantic import CONFIG_TDF +import tempfile import zipfile -import os - -original_env = os.environ.copy() -original_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" +from pathlib import Path -# Fail fast if OPENTDF_PLATFORM_URL is not set -platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL -if not platform_url: - raise Exception("OPENTDF_PLATFORM_URL must be set in config for integration tests") +import pytest -# Determine CLI flags based on platform URL -cli_flags = [] -otdfctl_flags = [] -if platform_url.startswith("http://"): - cli_flags = ["--plaintext"] - # otdfctl doesn't have a --plaintext flag, just omit --tls-no-verify for HTTP -else: - # For HTTPS, skip TLS verification if INSECURE_SKIP_VERIFY is True - if CONFIG_TDF.INSECURE_SKIP_VERIFY: - cli_flags = ["--insecure"] # equivalent to --tls-no-verify - otdfctl_flags = ["--tls-no-verify"] +from otdf_python.tdf_reader import TDF_MANIFEST_FILE_NAME, TDF_PAYLOAD_FILE_NAME +from tests.support_cli_args import ( + run_cli_decrypt, + run_cli_encrypt, +) +from tests.support_common import ( + handle_subprocess_error, + validate_plaintext_file_created, + validate_tdf3_file, +) +from tests.support_otdfctl_args import ( + run_otdfctl_decrypt_command, + run_otdfctl_encrypt_command, +) def _create_test_input_file(temp_path: Path, content: str) -> Path: @@ -41,18 +33,6 @@ def _create_test_input_file(temp_path: Path, content: str) -> Path: return input_file -def _validate_tdf_file(tdf_path: Path, tool_name: str) -> None: - """Validate that a TDF file exists, is not empty, and has correct ZIP structure.""" - assert tdf_path.exists(), f"{tool_name} did not create TDF file" - assert tdf_path.stat().st_size > 0, f"{tool_name} created empty TDF file" - assert zipfile.is_zipfile(tdf_path), f"{tool_name} output is not a valid ZIP file" - - # Verify TDF file has correct ZIP signature - with open(tdf_path, "rb") as f: - tdf_header = f.read(4) - assert tdf_header == b"PK\x03\x04", f"{tool_name} output is not a valid ZIP file" - - def _validate_key_access_objects(key_access: list) -> None: """Validate the keyAccessObjects (or KAO) structure in the TDF manifest.""" # New format - keyAccess is an array @@ -249,21 +229,8 @@ def _validate_tdf_zip_structure(tdf_path: Path) -> None: print("=" * 50) -def _handle_subprocess_error( - result: subprocess.CompletedProcess, collect_server_logs, tool_name: str -) -> None: - """Handle subprocess errors with proper server log collection and error reporting.""" - if result.returncode != 0: - # Collect server logs for debugging - logs = collect_server_logs() - print(f"Server logs when {tool_name} failed:\n{logs}") - - assert result.returncode == 0, f"{tool_name} failed: {result.stderr}" - - def _run_otdfctl_decrypt( tdf_path: Path, - platform_url: str, creds_file: Path, temp_path: Path, collect_server_logs, @@ -272,102 +239,46 @@ def _run_otdfctl_decrypt( """Run otdfctl decrypt on a TDF file and verify the decrypted content matches expected.""" decrypt_output = temp_path / f"{tdf_path.stem}_decrypted.txt" - otdfctl_decrypt_cmd = [ - "otdfctl", - "decrypt", - str(tdf_path), - "--host", - platform_url, - "--with-client-creds-file", - str(creds_file), - *otdfctl_flags, - "-o", - str(decrypt_output), - ] - - otdfctl_decrypt_result = subprocess.run( - otdfctl_decrypt_cmd, - capture_output=True, - text=True, + otdfctl_decrypt_result = run_otdfctl_decrypt_command( + creds_file=creds_file, + tdf_file=tdf_path, + output_file=decrypt_output, cwd=temp_path, - env=original_env, ) - _handle_subprocess_error( + handle_subprocess_error( otdfctl_decrypt_result, collect_server_logs, "otdfctl decrypt" ) - # Verify the decrypted file was created - assert decrypt_output.exists(), "otdfctl did not create decrypted file" - assert decrypt_output.stat().st_size > 0, "otdfctl created empty decrypted file" - - # Verify the decrypted content matches expected - with open(decrypt_output) as f: - decrypted_content = f.read() - - assert decrypted_content == expected_content, ( - f"Decrypted content does not match original. " - f"Expected: '{expected_content}', Got: '{decrypted_content}'" + validate_plaintext_file_created( + path=decrypt_output, scenario="otdfctl", expected_content=expected_content ) - print("✓ otdfctl successfully decrypted TDF with correct content") return decrypt_output def _run_python_cli_decrypt( tdf_path: Path, - platform_url: str, creds_file: Path, temp_path: Path, collect_server_logs, expected_content: str, + cwd: Path, ) -> Path: """Run Python CLI decrypt on a TDF file and verify the decrypted content matches expected.""" decrypt_output = temp_path / f"{tdf_path.stem}_python_decrypted.txt" - python_decrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(creds_file), - *cli_flags, - "decrypt", - str(tdf_path), - "-o", - str(decrypt_output), - ] - - python_decrypt_result = subprocess.run( - python_decrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, + python_decrypt_result = run_cli_decrypt( + creds_file=creds_file, input_file=tdf_path, output_file=decrypt_output, cwd=cwd ) - _handle_subprocess_error( + handle_subprocess_error( python_decrypt_result, collect_server_logs, "Python CLI decrypt" ) - # Verify the decrypted file was created - assert decrypt_output.exists(), "Python CLI did not create decrypted file" - assert decrypt_output.stat().st_size > 0, "Python CLI created empty decrypted file" - - # Verify the decrypted content matches expected - with open(decrypt_output) as f: - decrypted_content = f.read() - - assert decrypted_content == expected_content, ( - f"Decrypted content does not match original. " - f"Expected: '{expected_content}', Got: '{decrypted_content}'" + validate_plaintext_file_created( + path=decrypt_output, scenario="Python CLI", expected_content=expected_content ) - - print("✓ Python CLI successfully decrypted TDF with correct content") return decrypt_output @@ -387,42 +298,26 @@ def test_otdfctl_encrypt_with_validation(collect_server_logs, temp_credentials_f otdfctl_tdf_output = temp_path / "otdfctl_test.txt.tdf" # Run otdfctl encrypt to create a TDF file - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *otdfctl_flags, - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_encrypt_result = subprocess.run( - otdfctl_encrypt_cmd, - capture_output=True, - text=True, + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", cwd=temp_path, - env=original_env, ) # Handle any encryption errors - _handle_subprocess_error( + handle_subprocess_error( otdfctl_encrypt_result, collect_server_logs, "otdfctl encrypt" ) # Validate the TDF file structure - _validate_tdf_file(otdfctl_tdf_output, "otdfctl") + validate_tdf3_file(otdfctl_tdf_output, "otdfctl") _validate_tdf_zip_structure(otdfctl_tdf_output) # Test that the TDF can be decrypted successfully _run_otdfctl_decrypt( otdfctl_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -434,7 +329,7 @@ def test_otdfctl_encrypt_with_validation(collect_server_logs, temp_credentials_f @pytest.mark.integration -def test_python_encrypt(collect_server_logs, temp_credentials_file): +def test_python_encrypt(collect_server_logs, temp_credentials_file, project_root): """Integration test that uses Python CLI for encryption only and verifies the TDF can be inspected""" # Create temporary directory for work @@ -449,46 +344,25 @@ def test_python_encrypt(collect_server_logs, temp_credentials_file): python_tdf_output = temp_path / "python_cli_test.txt.tdf" # Run Python CLI encrypt to create a TDF file - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] - - python_encrypt_result = subprocess.run( - python_encrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, + python_encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + cwd=project_root, ) # Handle any encryption errors - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, "Python CLI encrypt" ) # Validate the TDF file structure - _validate_tdf_file(python_tdf_output, "Python CLI") + validate_tdf3_file(python_tdf_output, "Python CLI") _validate_tdf_zip_structure(python_tdf_output) # Test that the TDF can be decrypted by otdfctl _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -502,7 +376,9 @@ def test_python_encrypt(collect_server_logs, temp_credentials_file): @pytest.mark.integration -def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): +def test_cross_tool_compatibility( + collect_server_logs, temp_credentials_file, project_root +): """Test that TDFs created by one tool can be decrypted by the other.""" # Create temporary directory for work @@ -517,30 +393,15 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): otdfctl_tdf_output = temp_path / "otdfctl_for_python_decrypt.txt.tdf" # Encrypt with otdfctl - otdfctl_encrypt_cmd = [ - "otdfctl", - "encrypt", - "--host", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *otdfctl_flags, - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(otdfctl_tdf_output), - ] - - otdfctl_encrypt_result = subprocess.run( - otdfctl_encrypt_cmd, - capture_output=True, - text=True, + otdfctl_encrypt_result = run_otdfctl_encrypt_command( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=otdfctl_tdf_output, + mime_type="text/plain", cwd=temp_path, - env=original_env, ) - _handle_subprocess_error( + handle_subprocess_error( otdfctl_encrypt_result, collect_server_logs, "otdfctl encrypt (cross-tool test)", @@ -549,45 +410,25 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): # Decrypt with Python CLI _run_python_cli_decrypt( otdfctl_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, input_content, + project_root, ) # Test 2: Python CLI encrypt -> otdfctl decrypt python_tdf_output = temp_path / "python_for_otdfctl_decrypt.txt.tdf" # Encrypt with Python CLI - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] - - python_encrypt_result = subprocess.run( - python_encrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, + python_encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + cwd=project_root, ) - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, "Python CLI encrypt (cross-tool test)", @@ -596,7 +437,6 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): # Decrypt with otdfctl _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -609,7 +449,9 @@ def test_cross_tool_compatibility(collect_server_logs, temp_credentials_file): @pytest.mark.integration -def test_different_content_types(collect_server_logs, temp_credentials_file): +def test_different_content_types( + collect_server_logs, temp_credentials_file, project_root +): """Test encryption/decryption with different types of content.""" test_cases = [ @@ -635,46 +477,25 @@ def test_different_content_types(collect_server_logs, temp_credentials_file): # Test with Python CLI python_tdf_output = temp_path / f"python_{filename}.tdf" - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] - - python_encrypt_result = subprocess.run( - python_encrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, + python_encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + cwd=project_root, ) - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, f"Python CLI encrypt ({filename})", ) # Validate TDF structure - _validate_tdf_file(python_tdf_output, f"Python CLI ({filename})") + validate_tdf3_file(python_tdf_output, f"Python CLI ({filename})") # Decrypt and validate content _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, @@ -688,7 +509,9 @@ def test_different_content_types(collect_server_logs, temp_credentials_file): @pytest.mark.skip("Skipping test for now due to known issues with empty content") @pytest.mark.integration -def test_different_content_types_empty(collect_server_logs, temp_credentials_file): +def test_different_content_types_empty( + collect_server_logs, temp_credentials_file, project_root +): """Test encryption/decryption with different types of content.""" test_cases = [ @@ -711,46 +534,25 @@ def test_different_content_types_empty(collect_server_logs, temp_credentials_fil # Test with Python CLI python_tdf_output = temp_path / f"python_{filename}.tdf" - python_encrypt_cmd = [ - "uv", - "run", - "python", - "-m", - "otdf_python", - "--platform-url", - platform_url, - "--with-client-creds-file", - str(temp_credentials_file), - *cli_flags, - "encrypt", - "--mime-type", - "text/plain", - str(input_file), - "-o", - str(python_tdf_output), - ] - - python_encrypt_result = subprocess.run( - python_encrypt_cmd, - capture_output=True, - text=True, - cwd=Path(__file__).parent.parent, - env=original_env, + python_encrypt_result = run_cli_encrypt( + creds_file=temp_credentials_file, + input_file=input_file, + output_file=python_tdf_output, + cwd=project_root, ) - _handle_subprocess_error( + handle_subprocess_error( python_encrypt_result, collect_server_logs, f"Python CLI encrypt ({filename})", ) # Validate TDF structure - _validate_tdf_file(python_tdf_output, f"Python CLI ({filename})") + validate_tdf3_file(python_tdf_output, f"Python CLI ({filename})") # Decrypt and validate content _run_otdfctl_decrypt( python_tdf_output, - platform_url, temp_credentials_file, temp_path, collect_server_logs, diff --git a/tests/integration/test_data/empty_file.txt b/tests/integration/test_data/empty_file.txt new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_data/sample_binary.png b/tests/integration/test_data/sample_binary.png new file mode 100644 index 0000000..eb979e8 Binary files /dev/null and b/tests/integration/test_data/sample_binary.png differ diff --git a/tests/integration/test_data/sample_text.txt b/tests/integration/test_data/sample_text.txt new file mode 100644 index 0000000..5e103d5 --- /dev/null +++ b/tests/integration/test_data/sample_text.txt @@ -0,0 +1,5 @@ +Hello, World! This is a test text file for TDF target mode testing. +It contains multiple lines to test various scenarios. +Line 3 with special characters: àáâãäå +Line 4 with numbers: 123456789 +Final line with punctuation: !@#$%^&*()_+ diff --git a/tests/integration/test_data/sample_with_attributes.txt b/tests/integration/test_data/sample_with_attributes.txt new file mode 100644 index 0000000..05d84ec --- /dev/null +++ b/tests/integration/test_data/sample_with_attributes.txt @@ -0,0 +1,4 @@ +Sensitive data with attributes. +Classification: SECRET +Department: Engineering +Project: TDF Testing diff --git a/tests/integration/test_pe_interaction.py b/tests/integration/test_pe_interaction.py index 02418a3..00e4447 100644 --- a/tests/integration/test_pe_interaction.py +++ b/tests/integration/test_pe_interaction.py @@ -5,11 +5,12 @@ import logging import tempfile from pathlib import Path + import pytest from otdf_python.sdk import SDK -from tests.config_pydantic import CONFIG_TDF from otdf_python.sdk_exceptions import SDKException +from tests.config_pydantic import CONFIG_TDF from tests.integration.support_sdk import get_sdk_for_pe # Test files (adjust paths as needed) diff --git a/tests/mock_crypto.py b/tests/mock_crypto.py index 0209f67..006963b 100644 --- a/tests/mock_crypto.py +++ b/tests/mock_crypto.py @@ -1,5 +1,5 @@ -from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa def generate_rsa_keypair(): diff --git a/tests/server_logs.py b/tests/server_logs.py index 34d424d..0bc846d 100644 --- a/tests/server_logs.py +++ b/tests/server_logs.py @@ -2,8 +2,8 @@ Server log collection utility for debugging test failures. """ -import subprocess import logging +import subprocess from tests.config_pydantic import CONFIG_TESTING diff --git a/tests/support_cli_args.py b/tests/support_cli_args.py new file mode 100644 index 0000000..ab2900d --- /dev/null +++ b/tests/support_cli_args.py @@ -0,0 +1,183 @@ +""" +Support functions for constructing CLI arguments for this project's (Python) CLI. +""" + +import json +import logging +import subprocess +import sys +from pathlib import Path + +from tests.config_pydantic import CONFIG_TDF +from tests.support_common import get_platform_url, get_testing_environ + +logger = logging.getLogger(__name__) + + +def _get_cli_flags() -> list[str]: + """ + Determine (Python) CLI flags based on platform URL + """ + platform_url = get_platform_url() + cli_flags = [] + + if platform_url.startswith("http://"): + cli_flags = ["--plaintext"] + else: + # For HTTPS, skip TLS verification if INSECURE_SKIP_VERIFY is True + if CONFIG_TDF.INSECURE_SKIP_VERIFY: + cli_flags = ["--insecure"] + + return cli_flags + + +def run_cli_inspect(tdf_path: Path, creds_file: Path, cwd: Path) -> dict: + """ + Helper function to run Python CLI inspect command and return parsed JSON result. + + This demonstrates how the CLI inspect functionality could be tested + with the new fixtures. + """ + + # Build CLI command + cmd = [ + sys.executable, + "-m", + "otdf_python", + "--platform-url", + get_platform_url(), + "--with-client-creds-file", + str(creds_file), + *_get_cli_flags(), + "inspect", + str(tdf_path), + ] + + try: + # Run the CLI command + result = subprocess.run( + cmd, capture_output=True, text=True, check=True, cwd=cwd + ) + + # Parse JSON output + return json.loads(result.stdout) + + except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + logger.error(f"CLI inspect failed for {tdf_path}: {e}") + raise Exception(f"Failed to inspect TDF {tdf_path}: {e}") from e + + +def _build_cli_decrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + platform_url: str | None = None, +) -> list[str]: + """Build CLI decrypt command.""" + cmd = [ + sys.executable, + "-m", + "otdf_python", + "--platform-url", + platform_url if platform_url is not None else get_platform_url(), + "--with-client-creds-file", + str(creds_file), + *_get_cli_flags(), + "decrypt", + str(input_file), + "-o", + str(output_file), + ] + return cmd + + +def run_cli_decrypt( + creds_file: Path, + input_file: Path, + output_file: Path, + cwd: Path, + platform_url: str | None = None, +) -> subprocess.CompletedProcess: + python_decrypt_cmd = _build_cli_decrypt_command( + creds_file=creds_file, + input_file=input_file, + output_file=output_file, + platform_url=platform_url, + ) + return subprocess.run( + python_decrypt_cmd, + capture_output=True, + text=True, + cwd=cwd, + env=get_testing_environ(), + ) + + +def _build_cli_encrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + platform_url: str | None = None, + mime_type: str = "text/plain", + attributes: list[str] | None = None, + container_type: str = "tdf", +) -> list[str]: + cmd = [ + sys.executable, + "-m", + "otdf_python", + "--platform-url", + platform_url if platform_url is not None else get_platform_url(), + "--with-client-creds-file", + str(creds_file), + *_get_cli_flags(), + "encrypt", + "--mime-type", + mime_type, + "--container-type", + container_type, + ] + + # Add attributes if provided + if attributes: + for attr in attributes: + cmd.extend(["--attr", attr]) + + cmd.extend( + [ + str(input_file), + "-o", + str(output_file), + ] + ) + + return cmd + + +def run_cli_encrypt( + creds_file: Path, + input_file: Path, + output_file: Path, + cwd: Path, + platform_url: str | None = None, + mime_type: str = "text/plain", + attributes: list[str] | None = None, + container_type: str = "tdf", +) -> subprocess.CompletedProcess: + python_encrypt_cmd = _build_cli_encrypt_command( + creds_file=creds_file, + input_file=input_file, + output_file=output_file, + platform_url=platform_url, + mime_type=mime_type, + attributes=attributes, + container_type=container_type, + ) + + return subprocess.run( + python_encrypt_cmd, + capture_output=True, + text=True, + cwd=cwd, + env=get_testing_environ(), + ) diff --git a/tests/support_common.py b/tests/support_common.py new file mode 100644 index 0000000..c3bcf5c --- /dev/null +++ b/tests/support_common.py @@ -0,0 +1,90 @@ +import logging +import subprocess +import zipfile +from pathlib import Path + +import pytest + +from tests.config_pydantic import CONFIG_TDF + +logger = logging.getLogger(__name__) + + +def get_platform_url() -> str: + # Get platform configuration + platform_url = CONFIG_TDF.OPENTDF_PLATFORM_URL + if not platform_url: + # Fail fast if OPENTDF_PLATFORM_URL is not set + raise Exception( + "OPENTDF_PLATFORM_URL must be set in config for integration tests" + ) + return platform_url + + +def handle_subprocess_error( + result: subprocess.CompletedProcess, collect_server_logs, scenario_name: str +) -> None: + """Handle subprocess errors with proper server log collection and error reporting.""" + if result.returncode != 0: + # Collect server logs for debugging + logs = collect_server_logs() + print(f"Server logs when '{scenario_name}' failed:\n{logs}") + + pytest.fail( + f"Scenario failed: '{scenario_name}': " + f"stdout={result.stdout}, stderr={result.stderr}" + ) + + +def get_testing_environ() -> dict | None: + """ + Set up environment and configuration + + TODO: YAGNI: this is a hook we could use to modify all testing environments, e.g. + env = os.environ.copy() + env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" + return env + """ + return None + + +def validate_tdf3_file(tdf_path: Path, tool_name: str) -> None: + """Validate that a TDF file (tdf_type="tdf3") exists, is not empty, and has correct ZIP structure.""" + assert tdf_path.exists(), f"{tool_name} did not create TDF file" + assert tdf_path.stat().st_size > 0, f"{tool_name} created empty TDF file" + assert zipfile.is_zipfile(tdf_path), f"{tool_name} output is not a valid ZIP file" + + # Verify TDF file has correct ZIP signature + with open(tdf_path, "rb") as f: + tdf_header = f.read(4) + assert tdf_header == b"PK\x03\x04", f"{tool_name} output is not a valid ZIP file" + assert tdf_path.suffix == ".tdf", f"File should have .tdf extension: {tdf_path}" + + +def validate_plaintext_file_created( + path: Path, scenario: str, expected_content: str +) -> None: + """Validate that a non-empty file was created, and contains the expected content""" + assert path.exists(), f"{scenario=} did not create decrypted file" + assert path.stat().st_size > 0, f"{scenario=} created empty decrypted file" + # Verify scenario produces the expected decrypted content + with open(path) as f: + decrypted_content = f.read() + + assert decrypted_content == expected_content, ( + f"otdfctl decrypted content does not match original. " + f"Expected: '{expected_content}', Got: '{decrypted_content}'" + ) + + +def compare_tdf3_file_size(otdfctl_tdf_path: Path, py_cli_tdf_path: Path) -> None: + """Compare the file sizes of two TDF files (tdf_type="tdf3"), assert within 30% of each other.""" + size_otdfctl_tdf = otdfctl_tdf_path.stat().st_size + size_py_cli_tdf = py_cli_tdf_path.stat().st_size + size_diff_ratio = abs(size_otdfctl_tdf - size_py_cli_tdf) / max( + size_otdfctl_tdf, size_py_cli_tdf + ) + + assert size_diff_ratio < 0.3, ( + f"File sizes too different: otdfctl={size_otdfctl_tdf}, cli={size_py_cli_tdf}" + ) diff --git a/tests/support_otdfctl.py b/tests/support_otdfctl.py index c549f5c..1ec0a2e 100644 --- a/tests/support_otdfctl.py +++ b/tests/support_otdfctl.py @@ -1,4 +1,5 @@ import subprocess + import pytest @@ -12,17 +13,12 @@ def check_for_otdfctl(): an exception if the otdfctl command is not found. """ - # TODO Consider setting GRPC_ENFORCE_ALPN_ENABLED to false as needed - # test_env = os.environ.copy() - # test_env["GRPC_ENFORCE_ALPN_ENABLED"] = "false" - # Check if otdfctl is available try: subprocess.run( ["otdfctl", "--version"], capture_output=True, check=True, - # env=test_env ) except (subprocess.CalledProcessError, FileNotFoundError): raise Exception( diff --git a/tests/support_otdfctl_args.py b/tests/support_otdfctl_args.py new file mode 100644 index 0000000..99dcc4a --- /dev/null +++ b/tests/support_otdfctl_args.py @@ -0,0 +1,280 @@ +""" +Support functions for constructing CLI arguments for otdfctl CLI. +""" + +import logging +import subprocess +from pathlib import Path + +from tests.config_pydantic import CONFIG_TDF +from tests.support_common import get_platform_url, get_testing_environ + +logger = logging.getLogger(__name__) + + +def get_otdfctl_flags() -> list[str]: + """ + Determine otdfctl flags based on platform URL + """ + platform_url = get_platform_url() + otdfctl_flags = [] + if platform_url.startswith("http://"): + # otdfctl doesn't have a --plaintext flag, just omit --tls-no-verify for HTTP + pass + else: + # For HTTPS, skip TLS verification if INSECURE_SKIP_VERIFY is True + if CONFIG_TDF.INSECURE_SKIP_VERIFY: + otdfctl_flags = ["--tls-no-verify"] + + return otdfctl_flags + + +def get_otdfctl_base_command( + creds_file: Path, platform_url: str | None = None +) -> list[str]: + """Get base otdfctl command with common flags.""" + base_cmd = [ + "otdfctl", + "--host", + platform_url if platform_url is not None else get_platform_url(), + "--with-client-creds-file", + str(creds_file), + ] + + # Add platform-specific flags + base_cmd.extend(get_otdfctl_flags()) + + return base_cmd + + +def _build_otdfctl_encrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + platform_url: str | None = None, + mime_type: str = "text/plain", + attributes: list[str] | None = None, + tdf_type: str | None = None, + target_mode: str | None = None, +) -> list[str]: + """Build otdfctl encrypt command. + + Args: + platform_url: Platform URL like "http://localhost:8080" + creds_file: Path to credentials file + input_file: Path to the input file to encrypt + output_file: Path where the TDF file should be created + mime_type: Optional MIME type for the input file + attributes: Optional list of attributes to apply + tdf_type: TDF type (e.g., "tdf3", "nano") + target_mode: Target TDF spec version (e.g., "v4.2.2", "v4.3.1") + """ + + cmd = get_otdfctl_base_command(creds_file, platform_url) + cmd.append("encrypt") + cmd.extend(["--mime-type", mime_type]) + + # Add attributes if provided + if attributes: + for attr in attributes: + cmd.extend(["--attr", attr]) + + if tdf_type: + cmd.extend( + [ + "--tdf-type", + tdf_type, + ] + ) + + if target_mode: + cmd.extend(["--target-mode", target_mode]) + + cmd.extend( + [ + str(input_file), + "-o", + str(output_file), + ] + ) + return cmd + + +def run_otdfctl_encrypt_command( + creds_file: Path, + input_file: Path, + output_file: Path, + cwd: Path, + platform_url: str | None = None, + mime_type: str = "text/plain", + attributes: list[str] | None = None, + tdf_type: str | None = None, + target_mode: str | None = None, +) -> subprocess.CompletedProcess: + otdfctl_encrypt_cmd = _build_otdfctl_encrypt_command( + creds_file=creds_file, + input_file=input_file, + output_file=output_file, + platform_url=platform_url, + mime_type=mime_type, + attributes=attributes, + tdf_type=tdf_type, + target_mode=target_mode, + ) + return subprocess.run( + otdfctl_encrypt_cmd, + capture_output=True, + text=True, + cwd=cwd, + env=get_testing_environ(), + ) + + +def _build_otdfctl_decrypt_command( + creds_file: Path, tdf_file: Path, output_file: Path, platform_url: str | None = None +) -> list[str]: + """Build otdfctl decrypt command.""" + cmd = get_otdfctl_base_command(creds_file, platform_url) + cmd.extend( + [ + "decrypt", + str(tdf_file), + "-o", + str(output_file), + ] + ) + + return cmd + + +def run_otdfctl_decrypt_command( + creds_file: Path, + tdf_file: Path, + output_file: Path, + cwd: Path, + platform_url: str | None = None, +) -> subprocess.CompletedProcess: + otdfctl_decrypt_cmd = _build_otdfctl_decrypt_command( + creds_file=creds_file, + tdf_file=tdf_file, + output_file=output_file, + platform_url=platform_url, + ) + + return subprocess.run( + otdfctl_decrypt_cmd, + capture_output=True, + text=True, + cwd=cwd, + env=get_testing_environ(), + ) + + +def _generate_target_mode_tdf( + input_file: Path, + output_file: Path, + target_mode: str, + creds_file: Path, + attributes: list[str] | None = None, + mime_type: str | None = None, +) -> None: + # Ensure output directory exists + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Build otdfctl command + cmd = _build_otdfctl_encrypt_command( + platform_url=get_platform_url(), + creds_file=creds_file, + input_file=input_file, + output_file=output_file, + mime_type=mime_type if mime_type else "text/plain", + attributes=attributes if attributes else None, + tdf_type="tdf3", + target_mode=target_mode, + ) + + # Run otdfctl command + result = subprocess.run( + cmd, + capture_output=True, + text=True, + env=get_testing_environ(), + ) + + if result.returncode != 0: + logger.error(f"otdfctl command failed: {result.stderr}") + raise Exception( + f"Failed to generate TDF with target mode {target_mode}: " + f"stdout={result.stdout}, stderr={result.stderr}" + ) + + +def otdfctl_generate_tdf_files_for_target_mode( + target_mode: str, + temp_credentials_file: Path, + test_data_dir: Path, + sample_input_files: dict[str, Path], +) -> dict[str, Path]: + """ + Factory function to generate TDF files for a specific target mode. + + Args: + target_mode: Target TDF spec version (e.g., "v4.2.2", "v4.3.1") + temp_credentials_file: Path to credentials file + test_data_dir: Base test data directory + sample_input_files: Dictionary of sample input files + + Returns: + Dictionary mapping file types to their TDF file paths + """ + output_dir = test_data_dir / target_mode + tdf_files = {} + + # Define the file generation configurations + file_configs = [ + { + "key": "text", + "input_key": "text", + "output_name": "sample_text.txt.tdf", + "mime_type": "text/plain", + }, + # { + # "key": "empty", + # "input_key": "empty", + # "output_name": "empty_file.txt.tdf", + # "mime_type": "text/plain", + # }, + { + "key": "binary", + "input_key": "binary", + "output_name": "sample_binary.png.tdf", + "mime_type": "image/png", + }, + { + "key": "with_attributes", + "input_key": "with_attributes", + "output_name": "sample_with_attributes.txt.tdf", + "mime_type": "text/plain", + }, + ] + + try: + for config in file_configs: + tdf_path = output_dir / config["output_name"] + _generate_target_mode_tdf( + sample_input_files[config["input_key"]], + tdf_path, + target_mode, + temp_credentials_file, + attributes=[CONFIG_TDF.TEST_OPENTDF_ATTRIBUTE_1] + if config["key"] == "with_attributes" + else None, + mime_type=config["mime_type"], + ) + tdf_files[config["key"]] = tdf_path + + return tdf_files + + except Exception as e: + logger.error(f"Error generating {target_mode} TDF files: {e}") + raise Exception(f"Failed to generate {target_mode} TDF files: {e}") from e diff --git a/tests/test_aesgcm.py b/tests/test_aesgcm.py index b7a7e73..965f62d 100644 --- a/tests/test_aesgcm.py +++ b/tests/test_aesgcm.py @@ -1,6 +1,7 @@ +import os import unittest + from otdf_python.aesgcm import AesGcm -import os class TestAesGcm(unittest.TestCase): diff --git a/tests/test_assertion_config.py b/tests/test_assertion_config.py index 15b085c..f7c1a30 100644 --- a/tests/test_assertion_config.py +++ b/tests/test_assertion_config.py @@ -1,13 +1,14 @@ import unittest + from otdf_python.assertion_config import ( - Type, - Scope, - AssertionKeyAlg, AppliesToState, - BindingMethod, + AssertionConfig, AssertionKey, + AssertionKeyAlg, + BindingMethod, + Scope, Statement, - AssertionConfig, + Type, ) diff --git a/tests/test_asym_encryption.py b/tests/test_asym_encryption.py index e54eac9..617eb35 100644 --- a/tests/test_asym_encryption.py +++ b/tests/test_asym_encryption.py @@ -1,5 +1,5 @@ -from otdf_python.asym_encryption import AsymEncryption from otdf_python.asym_decryption import AsymDecryption +from otdf_python.asym_encryption import AsymEncryption from tests.mock_crypto import generate_rsa_keypair diff --git a/tests/test_autoconfigure_utils.py b/tests/test_autoconfigure_utils.py index 4916712..39fa412 100644 --- a/tests/test_autoconfigure_utils.py +++ b/tests/test_autoconfigure_utils.py @@ -1,10 +1,11 @@ import unittest + from otdf_python.autoconfigure_utils import ( - RuleType, - KeySplitStep, AttributeNameFQN, AttributeValueFQN, AutoConfigureException, + KeySplitStep, + RuleType, ) diff --git a/tests/test_cli.py b/tests/test_cli.py index ae53009..60e31ca 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,21 +2,22 @@ Test CLI functionality """ -import pytest +import os import subprocess import sys import tempfile -import os from pathlib import Path +import pytest + -def test_cli_help(): +def test_cli_help(project_root): """Test that CLI help command works""" result = subprocess.run( [sys.executable, "-m", "otdf_python", "--help"], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 0 assert "OpenTDF CLI" in result.stdout @@ -25,13 +26,13 @@ def test_cli_help(): assert "inspect" in result.stdout -def test_cli_version(): +def test_cli_version(project_root): """Test that CLI version command works""" result = subprocess.run( [sys.executable, "-m", "otdf_python", "--version"], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 0 assert "OpenTDF Python SDK" in result.stdout @@ -51,13 +52,13 @@ def test_cli_version(): assert expected_version in result.stdout -def test_cli_encrypt_help(): +def test_cli_encrypt_help(project_root): """Test that CLI encrypt help works""" result = subprocess.run( [sys.executable, "-m", "otdf_python", "encrypt", "--help"], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 0 assert "Path to file to encrypt" in result.stdout @@ -65,32 +66,32 @@ def test_cli_encrypt_help(): assert "--container-type" in result.stdout -def test_cli_decrypt_help(): +def test_cli_decrypt_help(project_root): """Test that CLI decrypt help works""" result = subprocess.run( [sys.executable, "-m", "otdf_python", "decrypt", "--help"], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 0 assert "Path to encrypted file" in result.stdout assert "--output" in result.stdout -def test_cli_inspect_help(): +def test_cli_inspect_help(project_root): """Test that CLI inspect help works""" result = subprocess.run( [sys.executable, "-m", "otdf_python", "inspect", "--help"], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 0 assert "Path to encrypted file" in result.stdout -def test_cli_encrypt_missing_auth(): +def test_cli_encrypt_missing_auth(project_root): """Test that CLI encrypt fails gracefully without authentication""" with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: f.write("test content") @@ -101,7 +102,7 @@ def test_cli_encrypt_missing_auth(): [sys.executable, "-m", "otdf_python", "encrypt", temp_file], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 1 assert "Authentication required" in result.stderr @@ -110,7 +111,7 @@ def test_cli_encrypt_missing_auth(): os.unlink(temp_file) -def test_cli_encrypt_missing_creds_file(): +def test_cli_encrypt_missing_creds_file(project_root): """Test that CLI encrypt fails gracefully with missing credentials file""" with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: f.write("test content") @@ -129,7 +130,7 @@ def test_cli_encrypt_missing_creds_file(): ], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 1 assert "Credentials file does not exist" in result.stderr @@ -137,7 +138,7 @@ def test_cli_encrypt_missing_creds_file(): os.unlink(temp_file) -def test_cli_encrypt_invalid_creds_file(): +def test_cli_encrypt_invalid_creds_file(project_root): """Test that CLI encrypt fails gracefully with invalid credentials file""" with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: f.write("test content") @@ -160,7 +161,7 @@ def test_cli_encrypt_invalid_creds_file(): ], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 1 assert "must contain 'clientId' and 'clientSecret' fields" in result.stderr @@ -169,13 +170,13 @@ def test_cli_encrypt_invalid_creds_file(): os.unlink(creds_file) -def test_cli_decrypt_missing_file(): +def test_cli_decrypt_missing_file(project_root): """Test that CLI decrypt fails gracefully with missing file""" result = subprocess.run( [sys.executable, "-m", "otdf_python", "decrypt", "nonexistent.tdf"], capture_output=True, text=True, - cwd=Path(__file__).parent.parent, + cwd=project_root, ) assert result.returncode == 1 assert "File does not exist" in result.stderr diff --git a/tests/test_collection_store.py b/tests/test_collection_store.py index 3131d87..5d18c24 100644 --- a/tests/test_collection_store.py +++ b/tests/test_collection_store.py @@ -1,8 +1,9 @@ import unittest + from otdf_python.collection_store import ( CollectionKey, - NoOpCollectionStore, CollectionStoreImpl, + NoOpCollectionStore, ) diff --git a/tests/test_config.py b/tests/test_config.py index 442f0d1..421691b 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,4 +1,4 @@ -from otdf_python.config import TDFConfig, KASInfo, get_kas_address +from otdf_python.config import KASInfo, TDFConfig, get_kas_address def test_tdf_config_defaults(): diff --git a/tests/test_crypto_utils.py b/tests/test_crypto_utils.py index 5f5fefa..2a802f5 100644 --- a/tests/test_crypto_utils.py +++ b/tests/test_crypto_utils.py @@ -1,7 +1,9 @@ import unittest -from otdf_python.crypto_utils import CryptoUtils + from cryptography.hazmat.primitives.asymmetric import ec +from otdf_python.crypto_utils import CryptoUtils + class TestCryptoUtils(unittest.TestCase): def test_hmac(self): diff --git a/tests/test_eckeypair.py b/tests/test_eckeypair.py index 81dddf5..b193dce 100644 --- a/tests/test_eckeypair.py +++ b/tests/test_eckeypair.py @@ -1,4 +1,5 @@ import unittest + from otdf_python.eckeypair import ECKeyPair diff --git a/tests/test_header.py b/tests/test_header.py index 62fcb79..678f931 100644 --- a/tests/test_header.py +++ b/tests/test_header.py @@ -1,9 +1,10 @@ +import unittest + +from otdf_python.ecc_mode import ECCMode from otdf_python.header import Header +from otdf_python.policy_info import PolicyInfo from otdf_python.resource_locator import ResourceLocator -from otdf_python.ecc_mode import ECCMode from otdf_python.symmetric_and_payload_config import SymmetricAndPayloadConfig -from otdf_python.policy_info import PolicyInfo -import unittest class TestHeader(unittest.TestCase): diff --git a/tests/test_inner_classes.py b/tests/test_inner_classes.py index 378abc7..3b5ea89 100644 --- a/tests/test_inner_classes.py +++ b/tests/test_inner_classes.py @@ -1,4 +1,5 @@ import unittest + from otdf_python.auth_headers import AuthHeaders from otdf_python.kas_info import KASInfo from otdf_python.policy_binding_serializer import PolicyBinding, PolicyBindingSerializer diff --git a/tests/test_kas_client.py b/tests/test_kas_client.py index 9620d45..b4c3b17 100644 --- a/tests/test_kas_client.py +++ b/tests/test_kas_client.py @@ -2,9 +2,11 @@ Unit tests for KASClient. """ -import pytest -from unittest.mock import patch, MagicMock from base64 import b64decode +from unittest.mock import MagicMock, patch + +import pytest + from otdf_python.kas_client import KASClient, KeyAccess from otdf_python.kas_key_cache import KASKeyCache from otdf_python.sdk_exceptions import SDKException diff --git a/tests/test_kas_key_cache.py b/tests/test_kas_key_cache.py index e665beb..b4e6cc7 100644 --- a/tests/test_kas_key_cache.py +++ b/tests/test_kas_key_cache.py @@ -2,9 +2,10 @@ Unit tests for KASKeyCache. """ -from otdf_python.kas_key_cache import KASKeyCache from dataclasses import dataclass +from otdf_python.kas_key_cache import KASKeyCache + @dataclass class MockKasInfo: diff --git a/tests/test_kas_key_management.py b/tests/test_kas_key_management.py index d1f1527..cb964df 100644 --- a/tests/test_kas_key_management.py +++ b/tests/test_kas_key_management.py @@ -1,11 +1,12 @@ -import unittest -from unittest.mock import Mock, patch import base64 import os +import unittest +from unittest.mock import Mock, patch + import pytest from otdf_python.kas_client import KASClient, KeyAccess -from otdf_python.key_type_constants import RSA_KEY_TYPE, EC_KEY_TYPE +from otdf_python.key_type_constants import EC_KEY_TYPE, RSA_KEY_TYPE class TestKASKeyManagement(unittest.TestCase): diff --git a/tests/test_key_type.py b/tests/test_key_type.py index 872b2c8..e899b90 100644 --- a/tests/test_key_type.py +++ b/tests/test_key_type.py @@ -1,4 +1,5 @@ import unittest + from otdf_python.key_type import KeyType diff --git a/tests/test_log_collection.py b/tests/test_log_collection.py index 2de4b75..1845290 100644 --- a/tests/test_log_collection.py +++ b/tests/test_log_collection.py @@ -5,8 +5,8 @@ This script tests the server log collection without running full pytest. """ -from tests.server_logs import collect_server_logs, log_server_logs_on_failure from tests.config_pydantic import CONFIG_TESTING +from tests.server_logs import collect_server_logs, log_server_logs_on_failure def test_log_collection(): diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 49fd5fa..f9e36d1 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -1,11 +1,11 @@ from otdf_python.manifest import ( Manifest, - ManifestEncryptionInformation, - ManifestPayload, ManifestAssertion, - ManifestMethod, - ManifestKeyAccess, + ManifestEncryptionInformation, ManifestIntegrityInformation, + ManifestKeyAccess, + ManifestMethod, + ManifestPayload, ManifestRootSignature, ManifestSegment, ) diff --git a/tests/test_manifest_format.py b/tests/test_manifest_format.py index 38353e7..674aa26 100644 --- a/tests/test_manifest_format.py +++ b/tests/test_manifest_format.py @@ -3,10 +3,9 @@ """ import json -from otdf_python.tdf import TDF -from otdf_python.config import TDFConfig, KASInfo - +from otdf_python.config import KASInfo, TDFConfig +from otdf_python.tdf import TDF from tests.mock_crypto import generate_rsa_keypair diff --git a/tests/test_nanotdf.py b/tests/test_nanotdf.py index dd36e09..1517d1e 100644 --- a/tests/test_nanotdf.py +++ b/tests/test_nanotdf.py @@ -1,7 +1,9 @@ -import pytest import secrets -from otdf_python.nanotdf import NanoTDF, NanoTDFMaxSizeLimit, InvalidNanoTDFConfig + +import pytest + from otdf_python.config import NanoTDFConfig +from otdf_python.nanotdf import InvalidNanoTDFConfig, NanoTDF, NanoTDFMaxSizeLimit def test_nanotdf_roundtrip(): @@ -39,8 +41,8 @@ def test_nanotdf_invalid_magic(): @pytest.mark.integration def test_nanotdf_integration_encrypt_decrypt(): # Load environment variables for integration - from tests.config_pydantic import CONFIG_TDF from otdf_python.config import KASInfo + from tests.config_pydantic import CONFIG_TDF # Create KAS info from configuration kas_info = KASInfo(url=CONFIG_TDF.KAS_ENDPOINT) diff --git a/tests/test_nanotdf_ecdsa_struct.py b/tests/test_nanotdf_ecdsa_struct.py index c8b71eb..d83eb16 100644 --- a/tests/test_nanotdf_ecdsa_struct.py +++ b/tests/test_nanotdf_ecdsa_struct.py @@ -5,8 +5,8 @@ import pytest from otdf_python.nanotdf_ecdsa_struct import ( - NanoTDFECDSAStruct, IncorrectNanoTDFECDSASignatureSize, + NanoTDFECDSAStruct, ) diff --git a/tests/test_nanotdf_integration.py b/tests/test_nanotdf_integration.py index cb1d47e..943cfaf 100644 --- a/tests/test_nanotdf_integration.py +++ b/tests/test_nanotdf_integration.py @@ -1,9 +1,11 @@ +import io + import pytest -from otdf_python.nanotdf import NanoTDF -from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization -import io -from otdf_python.config import NanoTDFConfig, KASInfo +from cryptography.hazmat.primitives.asymmetric import rsa + +from otdf_python.config import KASInfo, NanoTDFConfig +from otdf_python.nanotdf import NanoTDF @pytest.mark.integration diff --git a/tests/test_nanotdf_type.py b/tests/test_nanotdf_type.py index f3b614d..c93c8b8 100644 --- a/tests/test_nanotdf_type.py +++ b/tests/test_nanotdf_type.py @@ -1,10 +1,11 @@ import unittest + from otdf_python.nanotdf_type import ( + Cipher, ECCurve, - Protocol, IdentifierType, PolicyType, - Cipher, + Protocol, ) diff --git a/tests/test_policy_object.py b/tests/test_policy_object.py index f75e13d..a0ceb01 100644 --- a/tests/test_policy_object.py +++ b/tests/test_policy_object.py @@ -1,4 +1,5 @@ import unittest + from otdf_python.policy_object import AttributeObject, PolicyBody, PolicyObject diff --git a/tests/test_sdk_builder.py b/tests/test_sdk_builder.py index d3b039f..0e9f835 100644 --- a/tests/test_sdk_builder.py +++ b/tests/test_sdk_builder.py @@ -3,10 +3,11 @@ """ import os +import tempfile +from unittest.mock import MagicMock, patch + import pytest import respx -import tempfile -from unittest.mock import patch, MagicMock from otdf_python.sdk import SDK from otdf_python.sdk_builder import SDKBuilder diff --git a/tests/test_sdk_exceptions.py b/tests/test_sdk_exceptions.py index 9a5307e..92ddf4b 100644 --- a/tests/test_sdk_exceptions.py +++ b/tests/test_sdk_exceptions.py @@ -1,5 +1,6 @@ import unittest -from otdf_python.sdk_exceptions import SDKException, AutoConfigureException + +from otdf_python.sdk_exceptions import AutoConfigureException, SDKException class TestSDKExceptions(unittest.TestCase): diff --git a/tests/test_sdk_mock.py b/tests/test_sdk_mock.py index 531e309..088a452 100644 --- a/tests/test_sdk_mock.py +++ b/tests/test_sdk_mock.py @@ -1,12 +1,12 @@ from otdf_python.sdk import ( - SDK, KAS, + SDK, AttributesServiceClientInterface, - NamespaceServiceClientInterface, - SubjectMappingServiceClientInterface, - ResourceMappingServiceClientInterface, AuthorizationServiceClientInterface, KeyAccessServerRegistryServiceClientInterface, + NamespaceServiceClientInterface, + ResourceMappingServiceClientInterface, + SubjectMappingServiceClientInterface, ) diff --git a/tests/test_tdf.py b/tests/test_tdf.py index 664f0ac..699ba32 100644 --- a/tests/test_tdf.py +++ b/tests/test_tdf.py @@ -1,11 +1,12 @@ -from otdf_python.tdf import TDF, TDFReaderConfig -from otdf_python.config import TDFConfig, KASInfo -from otdf_python.manifest import Manifest import io -import zipfile import json +import zipfile + import pytest +from otdf_python.config import KASInfo, TDFConfig +from otdf_python.manifest import Manifest +from otdf_python.tdf import TDF, TDFReaderConfig from tests.mock_crypto import generate_rsa_keypair diff --git a/tests/test_tdf_key_management.py b/tests/test_tdf_key_management.py index 25cfc0c..03d1fff 100644 --- a/tests/test_tdf_key_management.py +++ b/tests/test_tdf_key_management.py @@ -1,20 +1,20 @@ -import unittest -from unittest.mock import Mock, patch import base64 import io +import unittest import zipfile +from unittest.mock import Mock, patch -from otdf_python.tdf import TDF, TDFReaderConfig from otdf_python.manifest import ( Manifest, ManifestEncryptionInformation, + ManifestIntegrityInformation, + ManifestKeyAccess, ManifestMethod, ManifestPayload, - ManifestKeyAccess, - ManifestIntegrityInformation, ManifestRootSignature, ManifestSegment, ) +from otdf_python.tdf import TDF, TDFReaderConfig class TestTDFKeyManagement(unittest.TestCase): diff --git a/tests/test_tdf_reader.py b/tests/test_tdf_reader.py index 12e09f0..5e7c634 100644 --- a/tests/test_tdf_reader.py +++ b/tests/test_tdf_reader.py @@ -4,15 +4,16 @@ import io import json -import pytest from unittest.mock import MagicMock, patch +import pytest + +from otdf_python.policy_object import PolicyObject from otdf_python.tdf_reader import ( - TDFReader, TDF_MANIFEST_FILE_NAME, TDF_PAYLOAD_FILE_NAME, + TDFReader, ) -from otdf_python.policy_object import PolicyObject class TestTDFReader: diff --git a/tests/test_tdf_writer.py b/tests/test_tdf_writer.py index e586446..4d6ff79 100644 --- a/tests/test_tdf_writer.py +++ b/tests/test_tdf_writer.py @@ -1,6 +1,7 @@ -import unittest import io +import unittest import zipfile + from otdf_python.tdf_writer import TDFWriter diff --git a/tests/test_token_source.py b/tests/test_token_source.py index 972254d..5bc9e28 100644 --- a/tests/test_token_source.py +++ b/tests/test_token_source.py @@ -3,7 +3,8 @@ """ import time -from unittest.mock import patch, MagicMock +from unittest.mock import MagicMock, patch + from otdf_python.token_source import TokenSource diff --git a/tests/test_url_normalization.py b/tests/test_url_normalization.py index 9c71351..6c6eab9 100644 --- a/tests/test_url_normalization.py +++ b/tests/test_url_normalization.py @@ -7,8 +7,8 @@ """ # Allow importing from src directory -import sys import os +import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) diff --git a/tests/test_use_plaintext_flow.py b/tests/test_use_plaintext_flow.py index dfc09e6..3019bed 100644 --- a/tests/test_use_plaintext_flow.py +++ b/tests/test_use_plaintext_flow.py @@ -2,7 +2,7 @@ Test to verify that the use_plaintext parameter flows correctly from SDKBuilder to KASClient. """ -from unittest.mock import patch, MagicMock +from unittest.mock import MagicMock, patch from otdf_python.sdk_builder import SDKBuilder diff --git a/tests/test_validate_otdf_python.py b/tests/test_validate_otdf_python.py index b2bfc3c..10b3860 100644 --- a/tests/test_validate_otdf_python.py +++ b/tests/test_validate_otdf_python.py @@ -6,13 +6,14 @@ uv run pytest tests/test_validate_otdf_python.py """ +import logging import sys import tempfile -import logging from pathlib import Path -from otdf_python.tdf import TDFReaderConfig + import pytest +from otdf_python.tdf import TDFReaderConfig from tests.integration.support_sdk import get_sdk # Set up detailed logging diff --git a/tests/test_version.py b/tests/test_version.py index 4232608..c4740cf 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -1,4 +1,5 @@ import unittest + from otdf_python.version import Version diff --git a/tests/test_zip_reader.py b/tests/test_zip_reader.py index 1d98120..11af01c 100644 --- a/tests/test_zip_reader.py +++ b/tests/test_zip_reader.py @@ -1,8 +1,9 @@ -import unittest import io import random -from otdf_python.zip_writer import ZipWriter +import unittest + from otdf_python.zip_reader import ZipReader +from otdf_python.zip_writer import ZipWriter class TestZipReader(unittest.TestCase): diff --git a/tests/test_zip_writer.py b/tests/test_zip_writer.py index cb1112d..bf7dccd 100644 --- a/tests/test_zip_writer.py +++ b/tests/test_zip_writer.py @@ -1,8 +1,9 @@ -import unittest import io -from otdf_python.zip_writer import ZipWriter +import unittest import zipfile +from otdf_python.zip_writer import ZipWriter + class TestZipWriter(unittest.TestCase): def test_data_and_stream(self):