In [1]:
import copy
import functools
import logging
from dataclasses import dataclass
import pickle
from typing import TypeVar
from enum import StrEnum

from osmocom.tlv import BER_TLV_IE
from osmocom.utils import b2h, h2b, i2h
from pySim.commands import SimCardCommands
from pySim.euicc import (
    EuiccConfiguredAddresses,
    EuiccInfo2,
    GetEuiccChallenge,
    GetEuiccData,
    TagList,
)
from pySim.transport.pcsc import PcscSimLink

T = TypeVar("T")
logging.basicConfig(level=logging.DEBUG)


class ESIMException(Exception):
    def __init__(self, sw: str):
        self.sw = sw

    def __str__(self):
        return f"Error: {self.sw}"


class MUTATION(StrEnum):
    BITFLIP = "bitflip"
    RANDOM_BYTE = "random_byte"
    ZERO_BLOCK = "zero_block"
    SHUFFLE_BLOCKS = "shuffle_blocks"
    TRUNCATE = "truncate"


class APDUPacket:
    def __init__(
        self,
        cla: int,
        ins: int,
        p1: int,
        p2: int,
        data: bytes = b"",
        le: int = 0,
    ):
        """Initializes an APDU packet."""
        self.cla = cla
        self.ins = ins
        self.p1 = p1
        self.p2 = p2
        self.data = data
        self.le = le

    def __str__(self):
        return f"APDU (cla={b2h([self.cla])} ins={i2h([self.ins])} p1={i2h([self.p1])} p2={i2h([self.p2])} lc={i2h([self.lc])} data={b2h(self.data)} p3/le={i2h([self.le])})"

    @property
    def lc(self):
        return len(self.data) if self.data else 0

    def to_hex(self) -> str:
        """Returns the APDU packet as a hex string."""
        apdu = bytearray([self.cla, self.ins, self.p1, self.p2])
        if self.lc > 0:
            apdu.append(self.lc)
            apdu.extend(self.data)
        if self.le > 0:
            apdu.append(self.le)

        return apdu.hex()


@dataclass
class Operation:
    application_name: str
    func_name: str
    result: T
    original_args: list
    mutated_args: dict
    mutation_info: list
    kwargs: dict


class MutationEngine:
    def __init__(self, mutation_rate: float = 0.01):
        self.mutation_rate = mutation_rate

    def bitflip(self, data: bytearray):
        length = len(data)
        num_flips = max(1, int(length * self.mutation_rate))

        for i in range(num_flips):
            index = (i * 31) % length  # Deterministic index selection
            bit = 1 << ((i * 7) % 8)  # Deterministic bit selection
            data[index] ^= bit

        return bytes(data)

    def random_byte(self, data: bytearray):
        length = len(data)
        num_mutations = max(1, int(length * self.mutation_rate))

        for i in range(num_mutations):
            index = (i * 29) % length
            data[index] = (index * 13) % 256

        return bytes(data)

    def zero_block(self, data: bytearray):
        length = len(data)
        start = (length // 4) % max(1, length - 20)
        end = min(length, start + 10)

        for i in range(start, end):
            data[i] = 0x00

        return bytes(data)

    def shuffle_blocks(self, data: bytearray):
        block_size = 16
        num_blocks = len(data) // block_size
        blocks = [
            data[i * block_size : (i + 1) * block_size] for i in range(num_blocks)
        ]

        # Deterministic block reordering based on index rotation
        blocks = sorted(blocks, key=lambda b: sum(b) % 256)
        data = b"".join(blocks)
        return bytes(data)

    def truncate(self, data: bytearray):
        truncate_point = (len(data) * 3) // 4  # Fixed truncation at 75%
        data = data[:truncate_point]
        return bytes(data)

    def mutate(
        self,
        apdu: APDUPacket,
        mutation_type: MUTATION,
    ) -> APDUPacket:
        data = bytearray(apdu.data)

        match mutation_type:
            case MUTATION.BITFLIP:
                apdu.data = self.bitflip(data)

            case MUTATION.RANDOM_BYTE:
                apdu.data = self.random_byte(data)

            case MUTATION.ZERO_BLOCK:
                apdu.data = self.zero_block(data)

            case MUTATION.SHUFFLE_BLOCKS:
                apdu.data = self.shuffle_blocks(data)

            case MUTATION.TRUNCATE:
                apdu.data = self.truncate(data)

        return apdu


def operation(func, mutation_type: str | None = None):
    """Decorator to intercept function calls, mutate input, and track store_data calls."""

    mutation_engine = MutationEngine()

    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        original_args = copy.deepcopy(args)
        mutated_args, mutation_info = mutate_data(original_args)

        logging.debug(
            f"Intercepted: {self.name}-{func.__name__}({args}, {kwargs}) -> Mutated: {mutated_args}"
        )

        result = func(self, *mutated_args, **kwargs)
        operation = Operation(
            self.name,
            func.__name__,
            result,
            original_args,
            mutated_args,
            mutation_info,
            kwargs,
        )
        self._last_call = operation
        self._store_calls = []
        return result

    return wrapper


class OperationRecorder:
    def __init__(self):
        self.history = []

    def record(self, operation: Operation, store_calls: list):
        self.history.append((operation, store_calls))
        logging.debug(
            f"Recorded: {operation.func_name}({operation.mutated_args}, {operation.kwargs}) with mutation {operation.mutation_info} -> {len(store_calls)} store_data calls"
        )

    def save_file(self, file_path: str):
        with open(file_path, "wb") as f:
            pickle.dump(self, f)

    def replay(self, file_path: str, card: "Card"):
        with open(file_path, "rb") as f:
            recorder = pickle.load(f)
            for operation, store_calls in recorder.history:
                logging.debug(
                    f"Replaying: {operation.func_name}({operation.mutated_args}, {operation.kwargs}) with mutation {operation.mutation_info} -> {len(store_calls)} store_data calls"
                )
                applications_by_name = {
                    app.name: app for app in card.supported_applications.values()
                }
                application = applications_by_name.get(operation.application_name)
                if not application:
                    raise Exception(
                        f"Application {operation.application_name} not found"
                    )
                card.commands.select_adf(application.aid)
                application.__getattribute__(operation.func_name)(
                    *operation.original_args, **operation.kwargs
                )
                for apdu, response in store_calls:
                    logging.debug(
                        f"Sending APDU: {apdu.to_hex()} -> Response: {response}"
                    )


def mutate_data(data):
    if isinstance(data, list):
        mutated_data = copy.deepcopy(data)
        mutation_info = []

        for i in range(len(mutated_data)):
            if isinstance(mutated_data[i], int):
                mutation_info.append((i, mutated_data[i], mutated_data[i] + 1))
                mutated_data[i] += 1

        return mutated_data, mutation_info
    return data, None


class Application:
    aid: str
    alternative_aids: list[str] = []
    name: str

    def __init__(
        self,
        link: PcscSimLink,
        aid: str | None = None,
        recorder: OperationRecorder | None = None,
    ):
        self.link = link
        self.aid = aid or self.aid
        self.recorder = recorder

        self._last_call = None
        self._store_calls = []

    def send_apdu(self, apdu: APDUPacket, expected_sw: int = 0x9000):
        data, sw = self.link.send_apdu(apdu.to_hex())
        if sw != expected_sw:
            raise Exception(f"Error: {sw}")

        return data

    def _merge_dicts(self, dicts: list[dict]) -> dict:
        return {k: v for d in dicts for k, v in d.items()}

    def store_data(self, data: str):
        apdu = APDUPacket(cla=0x80, ins=0xE2, p1=0x91, p2=0x00, data=h2b(data))
        response = self.link.send_apdu_checksw(apdu.to_hex(), sw="9000")

        if self._last_call:
            self._store_calls.append((apdu, response))

        if len(self._store_calls) == 1:
            self.recorder.record(self._last_call, self._store_calls)

        return response

    def store_data_tlv(
        self,
        command_cls: type[BER_TLV_IE],
        response_cls: type[BER_TLV_IE] | None = None,
    ) -> type[BER_TLV_IE] | None:
        command_encoded = command_cls().to_tlv()

        if len(command_encoded) > 255:
            raise ValueError("Data too long")

        data, _ = self.store_data(b2h(command_encoded))

        if response_cls is None:
            response_cls = command_cls

        if data:
            response = response_cls()
            response.from_tlv(h2b(data))
            return response

        return None


@dataclass
class Profile:
    smdpp_address: str
    matching_id: str
    confirmation_code: str | None = None

    def full_activation_code(self) -> str:
        return f"LPA:1${self.smdpp_address}${self.matching_id}"

    @staticmethod
    def from_activation_code(activation_code: str) -> "Profile":
        if not activation_code.startswith("LPA:1$"):
            raise ValueError("Invalid activation code")

        parts = activation_code.split("$")
        assert len(parts) == 3

        return Profile(parts[1], parts[2])


class USIM(Application):
    aid = "a0000000871002"
    name = "USIM"


class ISDR(Application):
    aid = "A0000005591010FFFFFFFF8900000100"
    name = "ISDR"
    alternative_aids = [
        "A0000005591010FFFFFFFF8900050500",  # 5Ber.esim
    ]

    @operation
    def get_euicc_challenge(self) -> str:
        euicc_challenge = self.store_data_tlv(GetEuiccChallenge)
        return euicc_challenge.to_dict().get("get_euicc_challenge")[0][
            "euicc_challenge"
        ]

    @operation
    def get_euicc_info_2(self) -> list[dict]:
        command = self.store_data_tlv(EuiccInfo2)
        euicc_info = command.to_dict().get("euicc_info2")
        return self._merge_dicts(euicc_info)

    @operation
    def get_configured_addresses(self) -> str:
        command = self.store_data_tlv(EuiccConfiguredAddresses)
        addresses = command.to_dict().get("euicc_configured_addresses")
        return self._merge_dicts(addresses)

    @operation
    def get_eid(self) -> str:
        command = GetEuiccData(children=[TagList(decoded=[0x5A])])
        eid_data = self.store_data_tlv(command, GetEuiccData)
        return eid_data.to_dict().get("get_euicc_data")[0]["eid_value"]

    def initiate_authentication(self, profile: Profile):
        smdpp_address = profile.smdpp_address
        if not smdpp_address:
            smdpp_address = self.get_configured_addresses().get("smdpp_address")

        logging.info(f"Initiating authentication with {smdpp_address}")
        challenge = self.get_euicc_challenge()
        return challenge

    @operation
    def download_profile(self, profile: Profile):
        smdpp_address = profile.smdpp_address
        if not smdpp_address:
            smdpp_address = self.get_configured_addresses().get("smdpp_address")

        logging.info(f"Downloading profile from {smdpp_address}")
        challenge = self.get_euicc_challenge()


class ECDSA(Application):
    aid = "A0000005591010FFFFFFFF8900000200"
    name = "ECDSA"


class ISDP(Application):
    aid = "A0000005591010FFFFFFFF8900000300"
    name = "ISDP"


class Card:
    supported_applications: dict[type[Application], Application] = {}

    def __init__(
        self,
        link: PcscSimLink,
        recorder: OperationRecorder | None = None,
    ):
        self.link = link
        applications: list[Application] = [USIM, ISDR, ECDSA, ISDP]
        self.commands = SimCardCommands(self.link)
        self.commands.cla_byte = "00"
        self.executed_commands = []
        self.atr = self.commands.get_atr()

        for application in applications:
            for aid in [application.aid] + application.alternative_aids:
                try:
                    self.commands.select_adf(aid)
                    self.supported_applications[application] = application(
                        self.link, aid, recorder
                    )
                    break
                except Exception:
                    pass

    @property
    def isd_r(self) -> ISDR:
        isd_r = self.supported_applications.get(ISDR)
        if not isd_r:
            raise Exception("ISDR application not supported")

        self.commands.select_adf(ISDR.aid)
        return isd_r

In [3]:
from resimulate.euicc.transport.pcsc_link import PcscLink


recorder = OperationRecorder()

with PcscLink() as link:
    card = Card(link, recorder)
    print([app.name for app in card.supported_applications.values()])
    print(card.isd_r.get_euicc_info_2())
    print(card.isd_r.get_euicc_challenge())
    print(card.isd_r.get_configured_addresses())

DEBUG:root:Disconnected from device HID Global OMNIKEY 3x21 Smart Card Reader [OMNIKEY 3x21 Smart Card Reader] 00 00
DEBUG:root:Connecting to device HID Global OMNIKEY 3x21 Smart Card Reader [OMNIKEY 3x21 Smart Card Reader] 00 00 using protocol T1
DEBUG:root:Connected to device HID Global OMNIKEY 3x21 Smart Card Reader [OMNIKEY 3x21 Smart Card Reader] 00 00
DEBUG:root:Intercepted: ISDR-get_euicc_info_2((), {}) -> Mutated: ()
DEBUG:root:Intercepted: ISDR-get_euicc_challenge((), {}) -> Mutated: ()
DEBUG:root:Recorded: get_euicc_info_2((), {}) with mutation None -> 1 store_data calls
DEBUG:root:Intercepted: ISDR-get_configured_addresses((), {}) -> Mutated: ()
DEBUG:root:Recorded: get_euicc_challenge((), {}) with mutation None -> 1 store_data calls
DEBUG:root:Disconnected from device HID Global OMNIKEY 3x21 Smart Card Reader [OMNIKEY 3x21 Smart Card Reader] 00 00


['USIM', 'ISDR', 'ECDSA']
{'profile_version': '2.3.1', 'svn': '2.3.0', 'euicc_firmware_ver': '36.17.4', 'ext_card_resource': '81010b82040016cec083027fe3', 'uicc_capability': '067f36f7c0', 'ts102241_version': '15.1.0', 'global_platform_version': '2.3.0', 'rsp_capability': '0398', 'euicc_ci_pki_list_for_verification': [{'subject_key_identifier': '81370f5125d0b1d408d4c3b232e6d25e795bebfb'}], 'euicc_ci_pki_list_for_signing': [{'subject_key_identifier': '81370f5125d0b1d408d4c3b232e6d25e795bebfb'}], 'unknown_ber_tlv_ie_99': {'raw': '0640'}, 'pp_version': '1.0.0', 'ss_acreditation_number': 'KN-DN-UP-0924'}
d9c8157ff1944a081d6bea9a4aa6d34a
{'default_dp_address': 'smdp-plus-0.eu.cd.rsp.kigen.com', 'root_ds_address': 'lpa.ds.gsma.com'}


In [None]:
recorder.save_file("operation_recorder.pkl")

In [None]:
recorder = OperationRecorder()
with PcscSimLink() as link:
    card = Card(link, recorder)
    recorder.replay("operation_recorder.pkl", card)