Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .claude/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

Python client for the AgentScore trust and reputation API.

## Identity Model

## Methods (sync + async)

- `get_reputation` / `aget_reputation` — cached reputation lookup (free)
- `assess` / `aassess` — identity gate with policy (paid). Accepts `operator_token` for non-wallet agents.
- `create_session` / `acreate_session` — create verification session
- `poll_session` / `apoll_session` — poll session status, returns credential when verified
- `create_credential` / `acreate_credential` — create operator credential (24h TTL default)
- `list_credentials` / `alist_credentials` — list active credentials
- `revoke_credential` / `arevoke_credential` — revoke a credential

## Architecture

Single-package Python library published to PyPI.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
ci:
runs-on: blacksmith-2vcpu-ubuntu-2404
steps:
- uses: actions/checkout@v6
- uses: useblacksmith/checkout@v1
- uses: astral-sh/setup-uv@v7
- run: uv sync --frozen --all-extras
- run: uv run ruff check .
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
runs-on: blacksmith-2vcpu-ubuntu-2404
timeout-minutes: 5
steps:
- uses: actions/checkout@v6
- uses: useblacksmith/checkout@v1

- name: Install osv-scanner
run: |
Expand All @@ -34,7 +34,7 @@ jobs:
runs-on: blacksmith-2vcpu-ubuntu-2404
timeout-minutes: 5
steps:
- uses: actions/checkout@v6
- uses: useblacksmith/checkout@v1
- uses: astral-sh/setup-uv@v7
- uses: actions/setup-python@v6
with:
Expand Down
49 changes: 43 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ print(rep["score"]["value"], rep["score"]["grade"])
# Filter to a specific chain
base_rep = client.get_reputation("0x1234...", chain="base")

# On-the-fly assessment with policy (paid)
result = client.assess("0x1234...", policy={"min_grade": "B", "min_score": 35})
print(result["decision"], result["decision_reasons"])

# Compliance assessment with verification policy
# Identity gate with policy (paid)
gated = client.assess("0x1234...", policy={
"require_kyc": True,
"require_sanctions_clear": True,
Expand All @@ -45,12 +41,53 @@ rep = client.get_reputation("0x1234...")
print(rep.get("verification_level")) # "none" | "wallet_claimed" | "kyc_verified"
```

### Credential-Based Identity

Agents without wallets can use operator credentials for identity:

```python
result = client.assess(operator_token="opc_...")
print(result["decision"]) # "allow" | "deny"
```

### Verification Sessions

Bootstrap identity for first-time agents:

```python
session = client.create_session()
print(session["verify_url"], session["poll_secret"])

status = client.poll_session(session["session_id"], session["poll_secret"])
if status["status"] == "verified":
print(status["operator_token"]) # "opc_..." — use for future requests
```

### Credential Management

```python
cred = client.create_credential(label="my-agent", ttl_days=7)
print(cred["credential"]) # shown once

credentials = client.list_credentials()
client.revoke_credential(cred["id"])
```

### Async

All methods have async variants prefixed with `a`:

```python
async with AgentScore(api_key="as_live_...") as client:
rep = await client.aget_reputation("0x1234...")
result = await client.aassess("0x1234...", policy={"min_grade": "B"})
result = await client.aassess("0x1234...", policy={"require_kyc": True})

# Identity model methods
session = await client.acreate_session()
status = await client.apoll_session(session["session_id"], session["poll_secret"])
cred = await client.acreate_credential(label="my-agent")
await client.alist_credentials()
await client.arevoke_credential(cred["id"])
```

### Context Manager
Expand Down
12 changes: 12 additions & 0 deletions agentscore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
from agentscore.errors import AgentScoreError
from agentscore.types import (
AssessResponse,
CredentialCreateResponse,
CredentialItem,
CredentialListResponse,
DecisionPolicy,
EntityType,
Grade,
OperatorVerification,
Reputation,
ReputationResponse,
ReputationStatus,
SessionCreateRequest,
SessionCreateResponse,
SessionPollResponse,
VerificationLevel,
)

Expand All @@ -20,13 +26,19 @@
"AgentScore",
"AgentScoreError",
"AssessResponse",
"CredentialCreateResponse",
"CredentialItem",
"CredentialListResponse",
"DecisionPolicy",
"EntityType",
"Grade",
"OperatorVerification",
"Reputation",
"ReputationResponse",
"ReputationStatus",
"SessionCreateRequest",
"SessionCreateResponse",
"SessionPollResponse",
"VerificationLevel",
"__version__",
]
135 changes: 129 additions & 6 deletions agentscore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
if TYPE_CHECKING:
from agentscore.types import (
AssessResponse,
CredentialCreateResponse,
CredentialListResponse,
DecisionPolicy,
ReputationResponse,
SessionCreateResponse,
SessionPollResponse,
)


Expand Down Expand Up @@ -58,6 +62,13 @@ def _get_async_client(self) -> httpx.AsyncClient:
return self._async_client

def _handle_response(self, response: httpx.Response) -> dict:
if response.status_code == 429:
retry_after = response.headers.get("retry-after", "1")
raise AgentScoreError(
code="rate_limited",
message=f"Rate limit exceeded. Retry after {retry_after}s",
status_code=429,
)
if response.status_code >= 400:
try:
body = response.json()
Expand Down Expand Up @@ -95,13 +106,18 @@ def get_reputation(self, address: str, chain: str | None = None) -> ReputationRe

def assess(
self,
address: str,
address: str | None = None,
chain: str | None = None,
refresh: bool = False,
policy: DecisionPolicy | None = None,
operator_token: str | None = None,
) -> AssessResponse:
"""Assess a wallet (paid, writes score on-the-fly)."""
body: dict[str, Any] = {"address": address}
"""Assess a wallet or operator (paid, writes score on-the-fly)."""
body: dict[str, Any] = {}
if address:
body["address"] = address
if operator_token:
body["operator_token"] = operator_token
if chain:
body["chain"] = chain
if refresh:
Expand All @@ -112,6 +128,57 @@ def assess(
response = client.post("/v1/assess", json=body)
return self._handle_response(response)

def create_session(
self,
context: str | None = None,
product_name: str | None = None,
) -> SessionCreateResponse:
"""Create an assessment session for deferred scoring."""
body: dict[str, Any] = {}
if context is not None:
body["context"] = context
if product_name is not None:
body["product_name"] = product_name
client = self._get_sync_client()
response = client.post("/v1/sessions", json=body)
return self._handle_response(response)

def poll_session(self, session_id: str, poll_secret: str) -> SessionPollResponse:
"""Poll a session for its current status and result."""
client = self._get_sync_client()
response = client.get(
f"/v1/sessions/{session_id}",
headers={"X-Poll-Secret": poll_secret},
)
return self._handle_response(response)

def create_credential(
self,
label: str | None = None,
ttl_days: int | None = None,
) -> CredentialCreateResponse:
"""Create a new API credential."""
body: dict[str, Any] = {}
if label is not None:
body["label"] = label
if ttl_days is not None:
body["ttl_days"] = ttl_days
client = self._get_sync_client()
response = client.post("/v1/credentials", json=body)
return self._handle_response(response)

def list_credentials(self) -> CredentialListResponse:
"""List all API credentials."""
client = self._get_sync_client()
response = client.get("/v1/credentials")
return self._handle_response(response)

def revoke_credential(self, id: str) -> dict:
"""Revoke an API credential by ID."""
client = self._get_sync_client()
response = client.delete(f"/v1/credentials/{id}")
return self._handle_response(response)

# --- Async methods ---

async def aget_reputation(self, address: str, chain: str | None = None) -> ReputationResponse:
Expand All @@ -125,13 +192,18 @@ async def aget_reputation(self, address: str, chain: str | None = None) -> Reput

async def aassess(
self,
address: str,
address: str | None = None,
chain: str | None = None,
refresh: bool = False,
policy: DecisionPolicy | None = None,
operator_token: str | None = None,
) -> AssessResponse:
"""Assess a wallet (paid, writes score on-the-fly)."""
body: dict[str, Any] = {"address": address}
"""Assess a wallet or operator (paid, writes score on-the-fly)."""
body: dict[str, Any] = {}
if address:
body["address"] = address
if operator_token:
body["operator_token"] = operator_token
if chain:
body["chain"] = chain
if refresh:
Expand All @@ -142,6 +214,57 @@ async def aassess(
response = await client.post("/v1/assess", json=body)
return self._handle_response(response)

async def acreate_session(
self,
context: str | None = None,
product_name: str | None = None,
) -> SessionCreateResponse:
"""Create an assessment session for deferred scoring."""
body: dict[str, Any] = {}
if context is not None:
body["context"] = context
if product_name is not None:
body["product_name"] = product_name
client = self._get_async_client()
response = await client.post("/v1/sessions", json=body)
return self._handle_response(response)

async def apoll_session(self, session_id: str, poll_secret: str) -> SessionPollResponse:
"""Poll a session for its current status and result."""
client = self._get_async_client()
response = await client.get(
f"/v1/sessions/{session_id}",
headers={"X-Poll-Secret": poll_secret},
)
return self._handle_response(response)

async def acreate_credential(
self,
label: str | None = None,
ttl_days: int | None = None,
) -> CredentialCreateResponse:
"""Create a new API credential."""
body: dict[str, Any] = {}
if label is not None:
body["label"] = label
if ttl_days is not None:
body["ttl_days"] = ttl_days
client = self._get_async_client()
response = await client.post("/v1/credentials", json=body)
return self._handle_response(response)

async def alist_credentials(self) -> CredentialListResponse:
"""List all API credentials."""
client = self._get_async_client()
response = await client.get("/v1/credentials")
return self._handle_response(response)

async def arevoke_credential(self, id: str) -> dict:
"""Revoke an API credential by ID."""
client = self._get_async_client()
response = await client.delete(f"/v1/credentials/{id}")
return self._handle_response(response)

def close(self):
if self._sync_client:
self._sync_client.close()
Expand Down
Loading
Loading