Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 90 additions & 44 deletions skills/finance/wallet_screening/skill.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import json
import os
import re
import requests
import glob
from typing import Any, Dict, List, Optional
from datetime import datetime
from skillware.core.base_skill import BaseSkill

_ETH_ADDRESS_RE = re.compile(r"^0x[a-fA-F0-9]{40}$")
_ZERO_WIDTH_CHARS = "\u200b\u200c\u200d\ufeff"


class WalletScreeningSkill(BaseSkill):
"""
Expand All @@ -31,13 +35,17 @@ def __init__(self, config: Optional[Dict[str, Any]] = None):
# Load Additional Datasets dynamically (normalized files, etc.)
self.additional_datasets = self._load_additional_datasets()

# ETH address -> sanctions records (built once; O(1) lookup per screen)
self._sanctions_index: Dict[str, List[Dict]] = {}
self._build_sanctions_index()

@property
def manifest(self) -> Dict[str, Any]:
return {}

def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
address = params.get("address")
if not address or not self._validate_eth_address(address):
address = self.normalize_eth_address(params.get("address") or "")
if not address:
return {"error": "Invalid Ethereum address provided."}

if not self.etherscan_api_key:
Expand All @@ -49,9 +57,8 @@ def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
eth_usd = self._get_price(self.coingecko_url_usd, "usd")
eth_eur = self._get_price(self.coingecko_url_eur, "eur")

# 2. Sanctions Check (Core + Additional)
sanctions_hits = self._check_against_sanctions(address)
sanctions_hits.extend(self._check_against_additional_sanctions(address))
# 2. Sanctions Check (FTM + additional lists via index)
sanctions_hits = self._lookup_sanctions_hits(address)

# 3. Analyze Transactions
analysis = self._analyze_transactions(txs, address)
Expand Down Expand Up @@ -126,10 +133,85 @@ def _load_additional_datasets(self) -> List[Dict]:

# --- API Helpers ---

def _validate_eth_address(self, address: str) -> bool:
return (
isinstance(address, str) and address.startswith("0x") and len(address) == 42
@staticmethod
def normalize_eth_address(address: str) -> Optional[str]:
"""Normalize and validate an Ethereum address (EIP-55 checksum not required)."""
if not isinstance(address, str):
return None
cleaned = address.strip().translate(
{ord(c): None for c in _ZERO_WIDTH_CHARS}
)
if not cleaned.lower().startswith("0x"):
return None
normalized = "0x" + cleaned[2:].lower()
if _ETH_ADDRESS_RE.match(normalized):
return normalized
return None

@staticmethod
def _iter_property_values(properties: Dict, key: str):
raw = properties.get(key)
if raw is None:
return
if isinstance(raw, list):
for item in raw:
if item is not None:
yield str(item)
else:
yield str(raw)

def _eth_addresses_from_record(self, record: Dict) -> List[str]:
"""Collect normalized Ethereum addresses from a sanctions or FTM record."""
found: List[str] = []
if "addresses" in record and isinstance(record["addresses"], list):
for a in record["addresses"]:
norm = self.normalize_eth_address(str(a))
if norm:
found.append(norm)

properties = record.get("properties") or {}
if isinstance(properties, dict):
for key in ("address", "publicKey"):
for value in self._iter_property_values(properties, key):
for part in value.split(","):
norm = self.normalize_eth_address(part)
if norm:
found.append(norm)

if "address" in record and not isinstance(record["address"], list):
norm = self.normalize_eth_address(str(record["address"]))
if norm:
found.append(norm)

return list(dict.fromkeys(found))

def _build_sanctions_index(self) -> None:
"""Index normalized ETH addresses from FTM and additional datasets."""
index: Dict[str, List[Dict]] = {}
for entity in self.sanctions_entities:
addrs = self._eth_addresses_from_record(entity)
if not addrs:
continue
tagged = dict(entity)
tagged["__source_file__"] = "entities.ftm.json"
for addr in addrs:
index.setdefault(addr, []).append(tagged)

for entry in self.additional_datasets:
addrs = self._eth_addresses_from_record(entry)
if not addrs:
continue
tagged = dict(entry)
for addr in addrs:
index.setdefault(addr, []).append(tagged)

self._sanctions_index = index

def _lookup_sanctions_hits(self, address: str) -> List[Dict]:
normalized = self.normalize_eth_address(address)
if not normalized:
return []
return list(self._sanctions_index.get(normalized, []))

def _get_price(self, url: str, currency: str) -> float:
try:
Expand Down Expand Up @@ -179,42 +261,6 @@ def _get_eth_balance(self, address: str) -> float:

# --- Logic Helpers ---

def _check_against_sanctions(self, address: str) -> List[Dict]:
hits = []
lower_addr = address.lower()
for entity in self.sanctions_entities:
matches = False
if "addresses" in entity:
if any(a.lower() == lower_addr for a in entity["addresses"]):
matches = True
elif "properties" in entity and "address" in entity["properties"]:
if entity["properties"]["address"].lower() == lower_addr:
matches = True

if matches:
entity["__source_file__"] = "entities.ftm.json"
hits.append(entity)
return hits

def _check_against_additional_sanctions(self, address: str) -> List[Dict]:
hits = []
lower_addr = address.lower()
for entry in self.additional_datasets:
addr = None
if "address" in entry:
addr = entry["address"]
elif "properties" in entry and "address" in entry["properties"]:
addr = entry["properties"]["address"]
elif "addresses" in entry and isinstance(entry["addresses"], list):
for a in entry["addresses"]:
if a.lower() == lower_addr:
addr = a
break

if addr and addr.lower() == lower_addr:
hits.append(entry)
return hits

def _analyze_transactions(
self, txs: List[Dict], wallet_addr: str
) -> Dict[str, Any]:
Expand Down
120 changes: 108 additions & 12 deletions tests/skills/finance/test_wallet_screening.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
def get_skill():
bundle = SkillLoader.load_skill("finance/wallet_screening")
# Initialize without needing real API keys
return bundle['module'].WalletScreeningSkill()
return bundle["module"].WalletScreeningSkill()


@patch("skills.finance.wallet_screening.skill.requests.get")
Expand All @@ -15,19 +15,25 @@ def test_wallet_screening_success(mock_get):

# Mock responses
mock_eth_balance = MagicMock()
mock_eth_balance.json.return_value = {"status": "1", "result": "1000000000000000000"} # 1 ETH
mock_eth_balance.json.return_value = {
"status": "1",
"result": "1000000000000000000",
} # 1 ETH

mock_txs = MagicMock()
mock_txs.json.return_value = {"status": "1", "result": [
{
"from": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".lower(),
"to": "0x123",
"value": "500000000000000000",
"isError": "0",
"gasUsed": "21000",
"gasPrice": "1000000000"
}
]}
mock_txs.json.return_value = {
"status": "1",
"result": [
{
"from": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045".lower(),
"to": "0x123",
"value": "500000000000000000",
"isError": "0",
"gasUsed": "21000",
"gasPrice": "1000000000",
}
],
}

mock_price = MagicMock()
mock_price.json.return_value = {"ethereum": {"usd": 2000.0, "eur": 1800.0}}
Expand Down Expand Up @@ -66,3 +72,93 @@ def test_wallet_screening_missing_key():
result = skill.execute({"address": "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"})
assert "error" in result
assert "Missing ETHERSCAN_API_KEY" in result["error"]


# Known ETH vector from entities.ftm.json (properties.publicKey)
SANCTIONED_ETH = "0x7eab248c014d1043e54e96ac4f31ec7d9a97ffd3"


def _mock_etherscan_empty(mock_get):
mock_eth_balance = MagicMock()
mock_eth_balance.json.return_value = {"status": "1", "result": "0"}
mock_txs = MagicMock()
mock_txs.json.return_value = {"status": "1", "result": []}
mock_price = MagicMock()
mock_price.json.return_value = {"ethereum": {"usd": 2000.0, "eur": 1800.0}}

def get_side_effect(url, **kwargs):
params = kwargs.get("params") or {}
if params.get("action") == "balance":
return mock_eth_balance
if params.get("action") == "txlist":
return mock_txs
return mock_price

mock_get.side_effect = get_side_effect


@patch("skills.finance.wallet_screening.skill.requests.get")
def test_ftm_publickey_eth_sanctions_match(mock_get):
skill = get_skill()
skill.etherscan_api_key = "dummy_key"
skill.sanctions_entities = [
{
"id": "test-ftm-wallet",
"schema": "CryptoWallet",
"caption": "sanctioned-test-wallet",
"properties": {
"publicKey": [SANCTIONED_ETH],
"topics": ["crime.terror"],
},
}
]
skill.additional_datasets = []
skill.malicious_contracts = []
skill._build_sanctions_index()
_mock_etherscan_empty(mock_get)

result = skill.execute({"address": SANCTIONED_ETH})

assert "error" not in result
assert result["summary"]["sanctioned_entity_match"] is True
assert len(result["risk_details"]["sanctions_hits"]) >= 1


@patch("skills.finance.wallet_screening.skill.requests.get")
def test_publickey_comma_separated_eth_match(mock_get):
skill = get_skill()
skill.etherscan_api_key = "dummy_key"
target = "0xc8fe1c81e927540fcc99ebb3c880a840082293da"
skill.sanctions_entities = [
{
"caption": "multi-chain-wallet",
"properties": {
"publicKey": [
"0xc8fe1c81e927540fcc99ebb3c880a840082293da, TR2nTb64cQMx6tqFwisoC6o7barFWHhPiw"
],
},
}
]
skill.additional_datasets = []
skill.malicious_contracts = []
skill._build_sanctions_index()
_mock_etherscan_empty(mock_get)

result = skill.execute({"address": target})

assert result["summary"]["sanctioned_entity_match"] is True


def test_normalize_eth_address_strips_zero_width():
skill = get_skill()
raw = "0x" + "\u200b" + "7eab248c014d1043e54e96ac4f31ec7d9a97ffd3"
assert skill.normalize_eth_address(raw) == SANCTIONED_ETH


def test_sanctions_index_real_ftm_publickey_vector():
skill = get_skill()
assert SANCTIONED_ETH in skill._sanctions_index
hits = skill._lookup_sanctions_hits(SANCTIONED_ETH)
assert len(hits) >= 1
assert hits[0]["__source_file__"] == "entities.ftm.json"
assert SANCTIONED_ETH in hits[0].get("properties", {}).get("publicKey", [])