diff --git a/skills/finance/wallet_screening/skill.py b/skills/finance/wallet_screening/skill.py index 82fd586..ef64d16 100644 --- a/skills/finance/wallet_screening/skill.py +++ b/skills/finance/wallet_screening/skill.py @@ -1,11 +1,15 @@ import json import os +import re import requests import glob from typing import Any, Dict, List, Optional from datetime import datetime from skillware.core.base_skill import BaseSkill +_ETH_ADDRESS_RE = re.compile(r"^0x[a-fA-F0-9]{40}$") +_ZERO_WIDTH_CHARS = "\u200b\u200c\u200d\ufeff" + class WalletScreeningSkill(BaseSkill): """ @@ -31,13 +35,17 @@ def __init__(self, config: Optional[Dict[str, Any]] = None): # Load Additional Datasets dynamically (normalized files, etc.) self.additional_datasets = self._load_additional_datasets() + # ETH address -> sanctions records (built once; O(1) lookup per screen) + self._sanctions_index: Dict[str, List[Dict]] = {} + self._build_sanctions_index() + @property def manifest(self) -> Dict[str, Any]: return {} def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: - address = params.get("address") - if not address or not self._validate_eth_address(address): + address = self.normalize_eth_address(params.get("address") or "") + if not address: return {"error": "Invalid Ethereum address provided."} if not self.etherscan_api_key: @@ -49,9 +57,8 @@ def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: eth_usd = self._get_price(self.coingecko_url_usd, "usd") eth_eur = self._get_price(self.coingecko_url_eur, "eur") - # 2. Sanctions Check (Core + Additional) - sanctions_hits = self._check_against_sanctions(address) - sanctions_hits.extend(self._check_against_additional_sanctions(address)) + # 2. Sanctions Check (FTM + additional lists via index) + sanctions_hits = self._lookup_sanctions_hits(address) # 3. Analyze Transactions analysis = self._analyze_transactions(txs, address) @@ -126,10 +133,85 @@ def _load_additional_datasets(self) -> List[Dict]: # --- API Helpers --- - def _validate_eth_address(self, address: str) -> bool: - return ( - isinstance(address, str) and address.startswith("0x") and len(address) == 42 + @staticmethod + def normalize_eth_address(address: str) -> Optional[str]: + """Normalize and validate an Ethereum address (EIP-55 checksum not required).""" + if not isinstance(address, str): + return None + cleaned = address.strip().translate( + {ord(c): None for c in _ZERO_WIDTH_CHARS} ) + if not cleaned.lower().startswith("0x"): + return None + normalized = "0x" + cleaned[2:].lower() + if _ETH_ADDRESS_RE.match(normalized): + return normalized + return None + + @staticmethod + def _iter_property_values(properties: Dict, key: str): + raw = properties.get(key) + if raw is None: + return + if isinstance(raw, list): + for item in raw: + if item is not None: + yield str(item) + else: + yield str(raw) + + def _eth_addresses_from_record(self, record: Dict) -> List[str]: + """Collect normalized Ethereum addresses from a sanctions or FTM record.""" + found: List[str] = [] + if "addresses" in record and isinstance(record["addresses"], list): + for a in record["addresses"]: + norm = self.normalize_eth_address(str(a)) + if norm: + found.append(norm) + + properties = record.get("properties") or {} + if isinstance(properties, dict): + for key in ("address", "publicKey"): + for value in self._iter_property_values(properties, key): + for part in value.split(","): + norm = self.normalize_eth_address(part) + if norm: + found.append(norm) + + if "address" in record and not isinstance(record["address"], list): + norm = self.normalize_eth_address(str(record["address"])) + if norm: + found.append(norm) + + return list(dict.fromkeys(found)) + + def _build_sanctions_index(self) -> None: + """Index normalized ETH addresses from FTM and additional datasets.""" + index: Dict[str, List[Dict]] = {} + for entity in self.sanctions_entities: + addrs = self._eth_addresses_from_record(entity) + if not addrs: + continue + tagged = dict(entity) + tagged["__source_file__"] = "entities.ftm.json" + for addr in addrs: + index.setdefault(addr, []).append(tagged) + + for entry in self.additional_datasets: + addrs = self._eth_addresses_from_record(entry) + if not addrs: + continue + tagged = dict(entry) + for addr in addrs: + index.setdefault(addr, []).append(tagged) + + self._sanctions_index = index + + def _lookup_sanctions_hits(self, address: str) -> List[Dict]: + normalized = self.normalize_eth_address(address) + if not normalized: + return [] + return list(self._sanctions_index.get(normalized, [])) def _get_price(self, url: str, currency: str) -> float: try: @@ -179,42 +261,6 @@ def _get_eth_balance(self, address: str) -> float: # --- Logic Helpers --- - def _check_against_sanctions(self, address: str) -> List[Dict]: - hits = [] - lower_addr = address.lower() - for entity in self.sanctions_entities: - matches = False - if "addresses" in entity: - if any(a.lower() == lower_addr for a in entity["addresses"]): - matches = True - elif "properties" in entity and "address" in entity["properties"]: - if entity["properties"]["address"].lower() == lower_addr: - matches = True - - if matches: - entity["__source_file__"] = "entities.ftm.json" - hits.append(entity) - return hits - - def _check_against_additional_sanctions(self, address: str) -> List[Dict]: - hits = [] - lower_addr = address.lower() - for entry in self.additional_datasets: - addr = None - if "address" in entry: - addr = entry["address"] - elif "properties" in entry and "address" in entry["properties"]: - addr = entry["properties"]["address"] - elif "addresses" in entry and isinstance(entry["addresses"], list): - for a in entry["addresses"]: - if a.lower() == lower_addr: - addr = a - break - - if addr and addr.lower() == lower_addr: - hits.append(entry) - return hits - def _analyze_transactions( self, txs: List[Dict], wallet_addr: str ) -> Dict[str, Any]: diff --git a/tests/skills/finance/test_wallet_screening.py b/tests/skills/finance/test_wallet_screening.py index e782bba..0e0084f 100644 --- a/tests/skills/finance/test_wallet_screening.py +++ b/tests/skills/finance/test_wallet_screening.py @@ -5,7 +5,7 @@ def get_skill(): bundle = SkillLoader.load_skill("finance/wallet_screening") # Initialize without needing real API keys - return bundle['module'].WalletScreeningSkill() + return bundle["module"].WalletScreeningSkill() @patch("skills.finance.wallet_screening.skill.requests.get") @@ -15,19 +15,25 @@ def test_wallet_screening_success(mock_get): # Mock responses mock_eth_balance = MagicMock() - mock_eth_balance.json.return_value = {"status": "1", "result": "1000000000000000000"} # 1 ETH + mock_eth_balance.json.return_value = { + "status": "1", + "result": "1000000000000000000", + } # 1 ETH mock_txs = MagicMock() - mock_txs.json.return_value = {"status": "1", "result": [ - { - "from": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".lower(), - "to": "0x123", - "value": "500000000000000000", - "isError": "0", - "gasUsed": "21000", - "gasPrice": "1000000000" - } - ]} + mock_txs.json.return_value = { + "status": "1", + "result": [ + { + "from": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".lower(), + "to": "0x123", + "value": "500000000000000000", + "isError": "0", + "gasUsed": "21000", + "gasPrice": "1000000000", + } + ], + } mock_price = MagicMock() mock_price.json.return_value = {"ethereum": {"usd": 2000.0, "eur": 1800.0}} @@ -66,3 +72,93 @@ def test_wallet_screening_missing_key(): result = skill.execute({"address": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"}) assert "error" in result assert "Missing ETHERSCAN_API_KEY" in result["error"] + + +# Known ETH vector from entities.ftm.json (properties.publicKey) +SANCTIONED_ETH = "0x7eab248c014d1043e54e96ac4f31ec7d9a97ffd3" + + +def _mock_etherscan_empty(mock_get): + mock_eth_balance = MagicMock() + mock_eth_balance.json.return_value = {"status": "1", "result": "0"} + mock_txs = MagicMock() + mock_txs.json.return_value = {"status": "1", "result": []} + mock_price = MagicMock() + mock_price.json.return_value = {"ethereum": {"usd": 2000.0, "eur": 1800.0}} + + def get_side_effect(url, **kwargs): + params = kwargs.get("params") or {} + if params.get("action") == "balance": + return mock_eth_balance + if params.get("action") == "txlist": + return mock_txs + return mock_price + + mock_get.side_effect = get_side_effect + + +@patch("skills.finance.wallet_screening.skill.requests.get") +def test_ftm_publickey_eth_sanctions_match(mock_get): + skill = get_skill() + skill.etherscan_api_key = "dummy_key" + skill.sanctions_entities = [ + { + "id": "test-ftm-wallet", + "schema": "CryptoWallet", + "caption": "sanctioned-test-wallet", + "properties": { + "publicKey": [SANCTIONED_ETH], + "topics": ["crime.terror"], + }, + } + ] + skill.additional_datasets = [] + skill.malicious_contracts = [] + skill._build_sanctions_index() + _mock_etherscan_empty(mock_get) + + result = skill.execute({"address": SANCTIONED_ETH}) + + assert "error" not in result + assert result["summary"]["sanctioned_entity_match"] is True + assert len(result["risk_details"]["sanctions_hits"]) >= 1 + + +@patch("skills.finance.wallet_screening.skill.requests.get") +def test_publickey_comma_separated_eth_match(mock_get): + skill = get_skill() + skill.etherscan_api_key = "dummy_key" + target = "0xc8fe1c81e927540fcc99ebb3c880a840082293da" + skill.sanctions_entities = [ + { + "caption": "multi-chain-wallet", + "properties": { + "publicKey": [ + "0xc8fe1c81e927540fcc99ebb3c880a840082293da, TR2nTb64cQMx6tqFwisoC6o7barFWHhPiw" + ], + }, + } + ] + skill.additional_datasets = [] + skill.malicious_contracts = [] + skill._build_sanctions_index() + _mock_etherscan_empty(mock_get) + + result = skill.execute({"address": target}) + + assert result["summary"]["sanctioned_entity_match"] is True + + +def test_normalize_eth_address_strips_zero_width(): + skill = get_skill() + raw = "0x" + "\u200b" + "7eab248c014d1043e54e96ac4f31ec7d9a97ffd3" + assert skill.normalize_eth_address(raw) == SANCTIONED_ETH + + +def test_sanctions_index_real_ftm_publickey_vector(): + skill = get_skill() + assert SANCTIONED_ETH in skill._sanctions_index + hits = skill._lookup_sanctions_hits(SANCTIONED_ETH) + assert len(hits) >= 1 + assert hits[0]["__source_file__"] == "entities.ftm.json" + assert SANCTIONED_ETH in hits[0].get("properties", {}).get("publicKey", [])