Skip to content

Recipes

Pim Feltkamp edited this page Apr 26, 2026 · 1 revision

Recipes

Practical, copyable patterns for the cryptohopper Python SDK. Every snippet runs as-is — drop into a .py file and execute. They use only the public SDK surface, never internals.

The SDK is synchronous and built on httpx. If you need async, wrap calls in asyncio.to_thread or run them in a thread pool — there's no separate AsyncCryptohopperClient (yet).

Contents

Use the client as a context manager

The client owns an httpx.Client connection pool. with blocks close it cleanly when you're done.

import os
from cryptohopper import CryptohopperClient

with CryptohopperClient(api_key=os.environ["CRYPTOHOPPER_TOKEN"]) as ch:
    me = ch.user.get()
    print(me["email"])

Outside a with block, call ch.close() explicitly when done — leaking pools holds open file descriptors.

Wait for a backtest to finish

Backtests run async on the server. create returns immediately with an ID; you poll get until status is terminal.

import time
from cryptohopper import CryptohopperClient

def run_backtest(ch: CryptohopperClient, hopper_id: int, from_date: str, to_date: str) -> dict:
    submitted = ch.backtest.create({
        "hopper_id": hopper_id,
        "start_date": from_date,
        "end_date": to_date,
    })
    bt_id = submitted["id"]

    while True:
        bt = ch.backtest.get(bt_id)
        if bt.get("status") in {"completed", "failed"}:
            return bt
        time.sleep(5)

The backtest rate bucket is separate (1 request per 2 seconds). 5-second polling stays well clear.

Find every open position across all your hoppers

with CryptohopperClient(api_key=os.environ["CRYPTOHOPPER_TOKEN"]) as ch:
    for h in ch.hoppers.list():
        positions = ch.hoppers.positions(h["id"])
        for p in positions:
            print(f'{h.get("name")} (#{h["id"]}): {p.get("amount")} {p.get("coin")} @ {p.get("rate")}')

This is sequential — one request per hopper. With 50+ hoppers, see the thread-pool recipe below for parallelisation.

Detect new fills since the last poll

import time

seen: set[int | str] = set()

def poll_fills(ch: CryptohopperClient, hopper_id: int) -> None:
    for o in ch.hoppers.orders(hopper_id):
        oid = o.get("id")
        if oid is not None and oid not in seen and o.get("status") == "filled":
            seen.add(oid)
            print(f'Fill: {o["market"]} {o["type"]} {o["amount"]} @ {o["price"]}')

while True:
    poll_fills(ch, hopper_id=42)
    time.sleep(10)

For production-grade fill notifications, configure the webhooks resource — push beats poll for event delivery.

Fail fast on auth errors, retry on transient ones

The SDK auto-retries 429s. For 5xx and network errors you may want a tighter retry; auth errors should never be retried.

import time
from cryptohopper import CryptohopperClient, CryptohopperError

def with_retry(fn, max_attempts: int = 3):
    for attempt in range(max_attempts):
        try:
            return fn()
        except CryptohopperError as e:
            if e.code in {"UNAUTHORIZED", "FORBIDDEN", "NOT_FOUND", "VALIDATION_ERROR"}:
                raise
            if attempt == max_attempts - 1:
                raise
            time.sleep(0.5 * (2 ** attempt))

me = with_retry(lambda: ch.user.get())

Read your remaining backtest quota

limits = ch.backtest.limits()
print(f"Backtests remaining: {limits.get('remaining')} of {limits.get('limit')}")

For the normal and order buckets there's no explicit quota endpoint — the only signal is Retry-After on a 429 (read it via error.retry_after_ms).

Run multiple SDK calls in parallel from a thread pool

The SDK is sync but reentrant — you can share one client across threads.

from concurrent.futures import ThreadPoolExecutor

def positions_for(hopper_id: int) -> list:
    return ch.hoppers.positions(hopper_id)

with ThreadPoolExecutor(max_workers=10) as pool:
    hoppers = ch.hoppers.list()
    results = list(pool.map(lambda h: (h["id"], positions_for(h["id"])), hoppers))

for hopper_id, positions in results:
    print(hopper_id, len(positions))

Each in-flight call counts against the normal bucket (30 req/min). With many concurrent calls, expect 429s — the SDK will retry transparently.

Bring your own httpx.Client (proxies, mTLS, instrumentation)

import httpx
from cryptohopper import CryptohopperClient

custom = httpx.Client(
    proxy="http://corp-proxy:8080",
    verify="/etc/ssl/certs/corp-ca.pem",
    event_hooks={
        "request": [lambda req: print(f"-> {req.method} {req.url}")],
        "response": [lambda res: print(f"<- {res.status_code} {res.url}")],
    },
)

with CryptohopperClient(
    api_key=os.environ["CRYPTOHOPPER_TOKEN"],
    http_client=custom,
) as ch:
    ch.user.get()

When you pass http_client, the SDK uses it as-is — timeout and base_url settings on the SDK still apply (they're set on a per-request basis). Make sure your custom client doesn't have a conflicting global timeout.

Tighten timeouts for short-lived workers

Default timeout is 30 seconds. Inside an AWS Lambda (15s) or other short-lived worker, the default outlives your invocation, leading to confusing "function killed" errors instead of clean SDK timeouts.

ch = CryptohopperClient(
    api_key=os.environ["CRYPTOHOPPER_TOKEN"],
    timeout=8.0,        # ~half your function budget
    max_retries=1,      # leave headroom for one retry inside the function lifetime
)

A CryptohopperError with code == "TIMEOUT" is much easier to handle than a process kill.

Disable the SDK's built-in retry and handle 429 yourself

from cryptohopper import CryptohopperClient, CryptohopperError

ch = CryptohopperClient(
    api_key=os.environ["CRYPTOHOPPER_TOKEN"],
    max_retries=0,
)

try:
    ch.hoppers.list()
except CryptohopperError as e:
    if e.code == "RATE_LIMITED":
        print(f"Rate limited; server says wait {e.retry_after_ms}ms")
        # your custom queue / circuit breaker / etc.
    else:
        raise

Useful when you have your own queue, want exact backoff control, or are running inside something that already does retries (Celery, RQ, Airflow).

Mock the SDK in tests with pytest-httpx

The test suite uses pytest-httpx — your tests can do the same.

import os
import pytest
from cryptohopper import CryptohopperClient

@pytest.fixture
def ch():
    with CryptohopperClient(api_key="test") as client:
        yield client

def test_user_get(httpx_mock, ch):
    httpx_mock.add_response(
        method="GET",
        url="https://api.cryptohopper.com/v1/user/get",
        json={"data": {"id": 42, "email": "alice@example.com"}},
    )
    me = ch.user.get()
    assert me["id"] == 42

def test_rate_limit_retry(httpx_mock, ch):
    httpx_mock.add_response(status_code=429, headers={"Retry-After": "0"})
    httpx_mock.add_response(json={"data": {"id": 42}})
    me = ch.user.get()
    assert me["id"] == 42

The SDK pulls data out of the envelope automatically — your mock returns {"data": ...}, your assertion sees the inner value.

See also