Python client for Amp - a database for blockchain data.
Features:
- Query Client: Issue Flight SQL queries to Amp servers
- Admin Client: Manage datasets, deployments, and jobs programmatically
- Registry Client: Discover, search, and publish datasets to the Registry
- Dataset Inspection: Explore dataset schemas with
inspect()anddescribe()methods - Data Loaders: Zero-copy loading into PostgreSQL, Redis, Snowflake, Delta Lake, Iceberg, and more
- Parallel Streaming: High-throughput parallel data ingestion with automatic resume
- Manifest Generation: Fluent API for creating and deploying datasets from SQL queries
- Auto-Refreshing Auth: Seamless authentication with automatic token refresh
- Rust
brew install rust
-
Ensure you have
uvinstalled locally. -
Install dependencies
uv build
-
Activate a virtual environment
Python 3.13 is the highest version supported
brew install python@3.13uv venv --python 3.13
from amp import Client
# Connect to Amp server
client = Client(url="grpc://localhost:8815")
# Execute query and convert to pandas
df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_arrow().to_pandas()
print(df)from amp import Client
# Connect with admin capabilities
client = Client(
query_url="grpc://localhost:8815",
admin_url="http://localhost:8080",
auth_token="your-token"
)
# Register and deploy a dataset
job = (
client.sql("SELECT block_num, hash FROM eth.blocks")
.with_dependency('eth', '_/eth_firehose@1.0.0')
.register_as('_', 'my_dataset', '1.0.0', 'blocks', 'mainnet')
.deploy(parallelism=4, end_block='latest', wait=True)
)
print(f"Deployment completed: {job.status}")# Load query results into PostgreSQL
result = client.sql("SELECT * FROM eth.blocks").load(
connection='my_pg_connection',
destination='eth_blocks'
)
print(f"Loaded {result.rows_loaded} rows")The client supports three authentication methods (in priority order):
from amp import Client
# 1. Explicit token (highest priority)
client = Client(
url="grpc://localhost:8815",
auth_token="your-token"
)
# 2. Environment variable
# export AMP_AUTH_TOKEN="your-token"
client = Client(url="grpc://localhost:8815")
# 3. Shared auth file (auto-refresh, recommended)
# Uses ~/.amp/cache/amp_cli_auth (shared with TypeScript CLI)
client = Client(
url="grpc://localhost:8815",
auth=True # Automatically refreshes expired tokens
)from amp import Client
# Connect with registry support
client = Client(
query_url="grpc://localhost:8815",
registry_url="https://api.registry.amp.staging.thegraph.com",
auth=True
)
# Search for datasets
results = client.registry.datasets.search('ethereum blocks')
for dataset in results.datasets[:5]:
print(f"{dataset.namespace}/{dataset.name} - {dataset.description}")
# Get dataset details
dataset = client.registry.datasets.get('edgeandnode', 'ethereum-mainnet')
print(f"Latest version: {dataset.latest_version}")
# Inspect dataset schema
client.registry.datasets.inspect('edgeandnode', 'ethereum-mainnet')Explore dataset schemas before querying:
from amp.registry import RegistryClient
client = RegistryClient()
# Pretty-print dataset structure (interactive)
client.datasets.inspect('edgeandnode', 'ethereum-mainnet')
# Output:
# Dataset: edgeandnode/ethereum-mainnet@latest
#
# blocks (21 columns)
# block_num UInt64 NOT NULL
# timestamp Timestamp(Nanosecond) NOT NULL
# hash FixedSizeBinary(32) NOT NULL
# ...
# Get structured schema data (programmatic)
schema = client.datasets.describe('edgeandnode', 'ethereum-mainnet')
# Find tables with specific columns
for table_name, columns in schema.items():
col_names = [col['name'] for col in columns]
if 'block_num' in col_names:
print(f"Table '{table_name}' has block_num column")
# Find all address columns (20-byte binary)
for table_name, columns in schema.items():
addresses = [col['name'] for col in columns if col['type'] == 'FixedSizeBinary(20)']
if addresses:
print(f"{table_name}: {', '.join(addresses)}")Start up a marimo workspace editor
uv run marimo editThe Marimo app will open a new browser tab where you can create a new notebook, view helpful resources, and browse existing notebooks in the workspace.
You can execute python apps and scripts using uv run <path> which will give them access to the dependencies
and the amp package. For example, you can run the execute_query app with the following command.
uv run apps/execute_query.py- Admin Client Guide - Complete guide for dataset management and deployment
- Registry Guide - Discover and search datasets in the Registry
- Dataset Inspection - Explore dataset schemas with
inspect()anddescribe() - Admin API Reference - Full API documentation for admin operations
- Parallel Streaming Usage Guide - User guide for high-throughput parallel data loading
- Parallel Streaming Design - Technical design documentation for parallel streaming architecture
- Reorganization Handling - Guide for handling blockchain reorganizations
- Implementing Data Loaders - Guide for creating custom data loaders
In order to operate a local Amp server you will need to have the files
that dump produces available locally, and run the server
You can then use it in your python scripts, apps or notebooks.
The project is set up to use the pytest testing framework.
It follows standard python test discovery rules.
Run all tests
uv run pytestRun only unit tests (fast, no external dependencies)
make test-unitRun integration tests with automatic container setup
make test-integrationRun all tests with coverage
make test-allIntegration tests can run in two modes:
The integration tests will automatically spin up PostgreSQL and Redis containers using testcontainers. This is the default mode and requires Docker to be installed and running.
# Run integration tests with automatic containers
uv run pytest tests/integration/ -m integrationNote: The configuration automatically disables Ryuk (testcontainers cleanup container) to avoid Docker connectivity issues. If you need Ryuk enabled, set TESTCONTAINERS_RYUK_DISABLED=false.
If you prefer to use your own database instances, you can disable testcontainers:
# Disable testcontainers and use manual configuration
export USE_TESTCONTAINERS=false
# Configure your database connections
export POSTGRES_HOST=localhost
export POSTGRES_PORT=5432
export POSTGRES_DB=test_amp
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=yourpassword
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=yourpassword # Optional
# Run tests
uv run pytest tests/integration/ -m integrationFor manual setup, you can use the provided Makefile commands:
# Start test databases manually
make test-setup
# Run tests
make test-integration
# Clean up databases
make test-cleanupRun tests for specific loaders:
make test-postgresql # PostgreSQL tests
make test-redis # Redis tests
make test-deltalake # Delta Lake tests
make test-iceberg # Iceberg tests
make test-lmdb # LMDB testsRun tests for specific features:
make test-parallel-streaming # Parallel streaming integration tests (requires Amp server)Note: Parallel streaming tests require an Amp server. Configure using environment variables in .test.env:
AMP_SERVER_URL- Amp server URL (e.g.,grpc://your-server:80)AMP_TEST_TABLE- Source table name (e.g.,eth_firehose.blocks)AMP_TEST_BLOCK_COLUMN- Block column name (default:block_num)AMP_TEST_MAX_BLOCK- Max block for testing (default:1000)
Ruff is configured to be used for linting and formatting of this project.
Run formatter
uv run ruff formatRun linter
uv run ruff check .Run linter and apply auto-fixes
uv run ruff check . --fix