Skip to content

devrupt-io/autoken-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Autoken Python Client

Official Python client library for Autoken - Universal API Gateway for AI Agent Task Execution.

Installation

pip install autoken

Development Installation (from source):

# Clone and install in editable mode
git clone https://github.com/devrupt-io/autoken-python.git
cd autoken-python
pip install -e ".[dev]"

Quick Start

from autoken import Autoken

# Uses AUTOKEN_API_KEY environment variable by default
client = Autoken()

# Or pass your API key explicitly
# client = Autoken("sk_your_api_key")

# Execute a task - search social media
result = client.task("social-search", {
    "query": "AI agent frameworks",
    "platforms": ["reddit", "x"],
    "limit": 50
})

print(result.output)
# {"results": [...], "itemCount": 50}

Getting Your API Key

  1. Sign up at autoken.devrupt.io
  2. Go to Dashboard > API Keys
  3. Click "Create Key" and copy your new API key (starts with sk_)
  4. Set the environment variable: export AUTOKEN_API_KEY=sk_your_api_key

Usage

Execute a Task

# Rent a proxy
proxy = client.task("proxy-rent", {
    "country": "US",
    "type": "residential",
    "duration": 3600  # 1 hour
})

# Extend an existing rental
extended = client.task("proxy-rent", {
    "rentalId": proxy.output["rentalId"],
    "duration": 3600  # extend by 1 hour
})

With Options

# Use specific version
client.task("translate-text@1.2.0", input_data)

# Or pass version as option
client.task("translate-text", input_data, {"version": "1.2.0"})

# Select provider strategy
client.task("translate-text", input_data, {"price": "lowest"})  # default
client.task("translate-text", input_data, {"price": "fastest"})
client.task("translate-text", input_data, {"price": "best"})

# Use specific provider
client.task("translate-text", input_data, {"provider": "deepl"})

Natural Language

result = client.natural(
    "Search Reddit and X for posts about machine learning startups"
)

Batch Execution

results = client.batch([
    {"task": "translate-text", "input": {"text": "Hello", "targetLang": "es"}},
    {"task": "translate-text", "input": {"text": "World", "targetLang": "fr"}},
])

Check Balance

balance = client.get_balance()
print(f"You have {balance.balance} tokens")

List Tasks

tasks = client.list_tasks()
ai_tasks = client.list_tasks("ai")

Get Task Details and Versions

# Get task with latest version info
task = client.get_task("hello-world")
print(f"Latest version: {task.latestVersion}")
# Output: Latest version: 1.0.0

# List all versions of a task
versions = client.get_task_versions("hello-world")
for version in versions:
    print(f"v{version.version} - {version.changelog}")

# Get a specific version
v1 = client.get_task_version("hello-world", "1.0.0")
print(v1.inputSchema)

Artifacts

Work with binary files (images, audio, PDFs) using the artifact API:

import hashlib

# Upload an artifact
with open("audio.mp3", "rb") as f:
    data = f.read()
    artifact_hash = client.upload_artifact(data, filename="audio.mp3")
print(f"Uploaded: {artifact_hash}")

# Check if artifact exists (useful for deduplication)
exists = client.artifact_exists(artifact_hash)
print(f"Exists: {exists}")

# Download an artifact
content = client.download_artifact(artifact_hash)
with open("downloaded.mp3", "wb") as f:
    f.write(content)

# List all artifacts
artifacts = client.list_artifacts()
for a in artifacts:
    print(f"{a['hash'][:16]}... {a['filename']}")

Async Support

import asyncio
from autoken import AsyncAutoken

async def main():
    # Uses AUTOKEN_API_KEY environment variable by default
    async with AsyncAutoken() as client:
        result = await client.task("translate-text", {
            "text": "Hello world",
            "targetLang": "es"
        })
        print(result.output)

asyncio.run(main())

Error Handling

from autoken import (
    Autoken,
    AuthenticationError,
    InsufficientTokensError,
    TaskNotFoundError,
)

try:
    client.task("translate-text", input_data)
except AuthenticationError:
    print("Invalid API key")
except InsufficientTokensError:
    print("Not enough tokens")
except TaskNotFoundError:
    print("Task does not exist")

CLI

The package includes a powerful command-line interface for executing tasks directly from your terminal.

Setup

# Set your API key (get it from https://autoken.devrupt.io/dashboard/api-keys)
export AUTOKEN_API_KEY=sk_your_api_key

Basic Usage

Execute tasks using named arguments that map directly to the task's input schema:

# Arguments like --some-arg map to "someArg" in the input schema
autoken translate-text --text "Hello world" --target-lang es
# {"translatedText": "Hola mundo", "sourceLanguage": "en"}

autoken social-search --query "AI agents" --platforms reddit,x --limit 10

Positional Arguments

For tasks with a primary/default input, you can use positional arguments for cleaner commands:

# Instead of: autoken transcribe-audio --audio @./myfile.mp3
autoken transcribe-audio ./myfile.mp3
# {"text": "Hello and welcome to...", "duration": 245.2}

# The first positional arg maps to the task's default input
autoken translate-text "Hello world" --target-lang fr

Artifact Inputs (Files)

Use the --@ prefix on the argument name to upload files as artifact inputs:

# Upload a file as an artifact input (--@audio means "audio" field, file input)
autoken transcribe-audio --@audio ./interview.mp3

# Process an image
autoken image-to-text --@image ./screenshot.png

# PDF extraction
autoken pdf-extract --@pdf ./document.pdf

Artifact Outputs (Saving Files)

Use the @ suffix on the argument name to save artifact outputs to files:

# Save audio output to a file (--audio@ means "audio" field, save to file)
autoken text-to-speech --text "Hello World" --audio@ ./greeting.mp3
# Saved audio to ./greeting.mp3

# Generate and save an image
autoken generate-image --prompt "A sunset" --image@ ./sunset.png

Artifact Management

For more complex workflows, you can manage artifacts directly:

# Upload an artifact manually
autoken artifact upload ./myfile.mp3
# Prints: abc123def456...  (the artifact hash)

# Check if an artifact exists
autoken artifact exists abc123def456...
# Prints: true or false

# Download an artifact
autoken artifact download abc123def456... ./output.mp3

# List your artifacts
autoken artifact list

Other Commands

# Check your token balance
autoken balance

# List available tasks
autoken tasks
autoken tasks --category ai

# Natural language task execution
autoken natural "Translate 'hello world' to Spanish"

# Get help
autoken --help
autoken <task-name> --help

Argument Mapping

CLI arguments are automatically converted to match the task's input schema:

CLI Argument Schema Property
--text text
--target-lang targetLang
--source-language sourceLanguage
--max-tokens maxTokens

Arrays can be passed as comma-separated values:

autoken social-search --platforms reddit,x,linkedin

Configuration

client = Autoken(
    "ak_your_api_key",
    base_url="https://autoken.devrupt.io/api/v1",  # default
    timeout=30.0,  # 30 seconds, default
)

Service Worker (For Service Providers)

If you're building an Autoken service (provider), use the ServiceWorker class to handle task polling, execution, and result reporting:

from autoken import ServiceWorker, run_worker

# Minimal configuration - uses AUTOKEN_API_KEY and AUTOKEN_API_URL
# environment variables by default
worker = ServiceWorker(
    tasks=["my-task"],  # Task schemas are fetched from the API
)

# Register task handlers using decorator
@worker.task("my-task")
async def handle_my_task(job):
    # job.execution_id - unique execution ID
    # job.input - validated input from the task request
    # job.task_name - name of the task being executed
    # job.assignment_token - token for heartbeats/cancellation
    
    result = await do_something(job.input)
    return result  # Automatically reported as success

# Start polling for tasks
asyncio.run(worker.start())

Heartbeats for Long-Running Tasks

For tasks that take longer than 60 seconds, send periodic heartbeats to prevent the task from being reassigned:

@worker.task("long-running-task")
async def handle_long_task(job):
    total_items = len(job.input["items"])
    
    for i, item in enumerate(job.input["items"]):
        # Process each item...
        await process_item(item)
        
        # Send heartbeat with progress
        progress = int((i + 1) / total_items * 100)
        await worker.send_heartbeat(
            job.execution_id,
            job.assignment_token,
            progress=progress,
            message=f"Processing item {i + 1} of {total_items}"
        )
    
    return {"processed": total_items}

Cancelling Tasks

If your worker can't complete a task, cancel it so it can be reassigned:

@worker.task("unreliable-task")
async def handle_unreliable_task(job):
    if not await check_external_service():
        # Cancel the task - it will be reassigned to another provider
        await worker.cancel_task(
            job.execution_id,
            job.assignment_token,
            reason="External service unavailable"
        )
        return None  # Don't report a result
    
    return await do_work(job.input)

Graceful Shutdown

The ServiceWorker handles SIGTERM/SIGINT signals automatically, cancelling any in-progress tasks so they can be reassigned:

# Tasks are automatically cancelled on shutdown
await worker.start()

# Or manually stop with cancellation
await worker.stop()  # Cancels all active tasks

Configuration Options

The ServiceWorker constructor accepts the following options:

worker = ServiceWorker(
    # Service API key (defaults to AUTOKEN_API_KEY env var)
    api_key=None,
    # Base URL of the API (defaults to AUTOKEN_API_URL env var or production URL)
    base_url=None,
    # List of task names this service handles
    tasks=["my-task"],
    # Maximum jobs to fetch per poll (default: 5)
    poll_limit=5,
    # Request timeout in seconds (default: 30.0)
    timeout=30.0,
)

Provider Name

Provider names are derived from the owning user's account name, not configured in client code. This provides consistent branding and accountability across the platform. Your provider's display name is associated with your user account.

Artifact Handling

Download and upload artifacts for file-based tasks:

@worker.task("process-file")
async def handle_process_file(job):
    # Download input artifact
    file_buffer = await worker.download_artifact(job.input["file_id"])
    
    # Process the file...
    result = process_file(file_buffer)
    
    # Upload result artifact
    artifact_id = await worker.upload_artifact(
        result.buffer,
        "result.pdf",
        "application/pdf"
    )
    
    return {"file_id": artifact_id}

Lifecycle Methods

# Load task definitions from disk
await worker.load_tasks()

# Register tasks with the Autoken API
await worker.register_tasks()

# Start polling (calls load_tasks and register_tasks automatically)
await worker.start()

# Stop polling
await worker.stop()

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published