Build trust and reliability into your AI applications with comprehensive telemetry and observability.
The RobotForge Python SDK is your first step toward building trustworthy and reliable AI applications. In a world where AI systems make critical decisions, having complete visibility into your model calls, tool executions, and agent behaviors isn't just nice to have—it's essential.
Debug with Confidence: When your AI application behaves unexpectedly, pinpoint exactly where things went wrong. See the complete chain of model calls, tool executions, and decision points that led to any outcome.
Optimize Performance: Identify bottlenecks in your AI workflows. Track token usage, response times, and costs across different models and configurations to make data-driven optimization decisions.
Build Trust: Provide stakeholders with complete transparency into your AI systems. Show exactly what inputs were processed, what decisions were made, and what outputs were generated—critical for compliance, auditing, and accountability.
Monitor in Production: Detect issues before your users do. Real-time monitoring of model performance, error rates, and unexpected behaviors helps you maintain high-quality AI experiences.
Understand Usage Patterns: Gain insights into how your AI features are actually being used. Which models are most popular? Which tools get called most frequently? Where are users experiencing friction?
The RobotForge SDK makes all of this possible with minimal code changes and zero impact on your application's performance.
pip install robotforge-python-sdkRequirements:
- Python 3.8+
aiohttpfor async HTTP requestsrequestsfor synchronous operations
import telemetery_sdk
# Initialize the SDK
client = TelemetryClient(
api_key="your-api-key",
endpoint="https://cloud.robotforge.com.ng",
project_id="your-project-id"
)import asyncio
from telemetry_sdk.client import TelemetryClient
async def main():
client = TelmetryClient(
api_key="your-api-key",
project_id="my-ai-app"
)
# Trace a model call
async with client.trace_model_call(
provider="openai",
model="gpt-4"
) as span:
span.set_input("What is the capital of France?")
# Your AI logic here
response = "Paris"
span.set_output(response)
span.set_tokens(150)
asyncio.run(main())Track every interaction with your language models—inputs, outputs, tokens, costs, and performance metrics.
async with client.trace_model_call(
provider="openai",
model="gpt-4-turbo",
temperature=0.7
) as span:
span.set_input(user_prompt)
# Make your model call
response = await openai.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": user_prompt}],
temperature=0.7
)
# Capture the results
span.set_output(response.choices[0].message.content)
span.set_tokens(response.usage.total_tokens)
span.set_cost(calculate_cost(response.usage))with client.trace_model_call_sync(
provider="anthropic",
model="claude-3-sonnet"
) as span:
span.set_input(prompt)
# Synchronous model call
response = anthropic.messages.create(
model="claude-3-sonnet-20240229",
messages=[{"role": "user", "content": prompt}]
)
span.set_output(response.content[0].text)
span.set_tokens(response.usage.input_tokens + response.usage.output_tokens)@client.trace_model_call_decorator(
provider="openai",
model="gpt-4"
)
async def generate_summary(text: str) -> str:
response = await openai.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"Summarize this text: {text}"
}]
)
return response.choices[0].message.content
# Use it like any other function
summary = await generate_summary(long_article)Available Methods:
set_input(text)- Set the input prompt or messageset_output(text)- Set the model's responseset_tokens(count)- Set total token countset_cost(amount)- Set the cost in your currencyset_temperature(temp)- Set the temperature parameterset_provider(name)- Set the model providerset_model(name)- Set the model nameset_metadata(key, value)- Add custom metadata
Monitor external tool calls, API requests, database queries, and any other operations your AI agents perform.
async with client.trace_tool_execution(
tool_name="web_search",
action="search",
endpoint="https://api.search.com/v1/search",
http_method="POST"
) as span:
span.set_input(search_query)
# Execute your tool
results = await search_api.search(
query=search_query,
limit=10
)
span.set_output(str(results))
span.set_http_status_code(200)with client.trace_tool_execution_sync(
tool_name="database_query",
action="fetch_user_data"
) as span:
span.set_input(f"user_id: {user_id}")
# Execute database query
user_data = db.query(
f"SELECT * FROM users WHERE id = {user_id}"
)
span.set_output(str(user_data))@client.trace_tool_execution_decorator(
tool_name="weather_api",
action="get_forecast"
)
async def get_weather(city: str) -> dict:
response = await weather_client.get_forecast(city)
return response.json()
# Automatically traced
weather = await get_weather("San Francisco")Available Methods:
set_input(text)- Set the tool input/parametersset_output(text)- Set the tool output/resultsset_action(name)- Set the action nameset_endpoint(url)- Set the API endpointset_http_method(method)- Set HTTP method (GET, POST, etc.)set_http_status_code(code)- Set HTTP status codeset_request_payload(data)- Set the request payloadset_response_payload(data)- Set the response payloadset_metadata(key, value)- Add custom metadata
The SDK provides both synchronous and asynchronous context managers for all tracing operations, giving you flexibility to use them in any Python environment.
Best for modern async applications using asyncio, FastAPI, or other async frameworks.
async def process_request(user_input: str):
# Model call
async with client.trace_model_call(
provider="openai",
model="gpt-4"
) as model_span:
model_span.set_input(user_input)
response = await call_model(user_input)
model_span.set_output(response)
# Tool execution
async with client.trace_tool_execution(
tool_name="data_processor"
) as tool_span:
tool_span.set_input(response)
result = await process_data(response)
tool_span.set_output(result)
return resultPerfect for traditional synchronous Python applications, scripts, and notebooks.
def analyze_text(text: str):
# Model call
with client.trace_model_call_sync(
provider="anthropic",
model="claude-3-opus"
) as model_span:
model_span.set_input(text)
analysis = call_claude(text)
model_span.set_output(analysis)
# Tool execution
with client.trace_tool_execution_sync(
tool_name="sentiment_analyzer"
) as tool_span:
tool_span.set_input(analysis)
sentiment = analyze_sentiment(analysis)
tool_span.set_output(sentiment)
return sentimentYou can use both patterns in the same application depending on your needs:
class AIService:
def __init__(self, client):
self.client = client
# Async method
async def async_process(self, input_data):
async with self.client.trace_model_call(...) as span:
return await self._async_call(input_data)
# Sync method
def sync_process(self, input_data):
with self.client.trace_model_call_sync(...) as span:
return self._sync_call(input_data)When to use which:
- Use async context managers (
trace_model_call,trace_tool_execution) withasync/awaitcode - Use sync context managers (
trace_model_call_sync,trace_tool_execution_sync) with regular synchronous code - The SDK handles timing, error tracking, and event submission automatically in both cases
Decorators provide a clean, reusable way to add tracing to your functions without modifying their internal logic.
@client.trace_model_call_decorator(
provider="openai",
model="gpt-4-turbo"
)
async def generate_response(prompt: str) -> str:
"""This function is automatically traced"""
response = await openai.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Works with sync functions too
@client.trace_model_call_decorator(
provider="anthropic",
model="claude-3-sonnet"
)
def sync_generate(prompt: str) -> str:
response = anthropic.messages.create(
model="claude-3-sonnet-20240229",
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text@client.trace_tool_execution_decorator(
tool_name="calculator",
action="compute"
)
async def calculate(expression: str) -> float:
"""Automatically traced calculator tool"""
return eval(expression) # Don't use eval in production!
@client.trace_tool_execution_decorator(
tool_name="web_scraper",
action="fetch_page"
)
def scrape_website(url: str) -> str:
response = requests.get(url)
return response.text- Clean Code: Tracing logic is separated from business logic
- Reusable: Apply the same tracing configuration to multiple functions
- Maintainable: Easy to add or remove tracing without changing function internals
- Type-Safe: Preserves function signatures and type hints
# Combine multiple decorators
@client.trace_model_call_decorator(provider="openai", model="gpt-4")
@retry(max_attempts=3)
@cache(ttl=3600)
async def cached_llm_call(prompt: str) -> str:
return await call_llm(prompt)
# Custom metadata
@client.trace_tool_execution_decorator(
tool_name="database",
action="query",
meta_database="postgres",
meta_query_type="select"
)
def query_db(sql: str) -> list:
return db.execute(sql)Decorator Methods:
trace_model_call_decorator(**kwargs)- Trace model callstrace_tool_execution_decorator(tool_name, **kwargs)- Trace tool executions- Both work with async and sync functions automatically
Every traced event automatically captures:
- Timing: Start time, end time, latency
- Status: Success, error, or pending
- Session Info: Session ID, user ID, tenant ID
- Component: Source component identifier
Add custom metadata to any trace for additional context:
async with client.trace_model_call(
provider="openai",
model="gpt-4"
) as span:
span.set_input(prompt)
# Add custom metadata
span.set_metadata("user_tier", "premium")
span.set_metadata("feature_flag", "new_ui_v2")
span.set_metadata("experiment_id", "exp_123")
span.set_metadata("request_id", request.id)
response = await call_model(prompt)
span.set_output(response)Errors are automatically captured and logged:
async with client.trace_model_call(...) as span:
try:
span.set_input(prompt)
response = await call_model(prompt)
span.set_output(response)
except Exception as e:
# Error is automatically recorded
span.set_metadata("error_type", type(e).__name__)
span.set_metadata("error_details", str(e))
raisefrom telemetry_sdk.client import TelemetryClient
client = TelemetryClient(
# Required
api_key="your-api-key",
# Optional
endpoint="https://api.robotforge.com.ng", # Default endpoint
project_id="my-project",
tenant_id="my-tenant",
application_id="my-app",
# Session management
session_id=None, # Auto-generated if not provided
user_id="user_123",
# Performance tuning
batch_size=10, # Number of events to batch before sending
flush_interval=5.0, # Seconds between automatic flushes
max_retries=3, # Retry attempts for failed requests
# Debugging
debug=False # Enable debug logging
)You can also configure the SDK using environment variables:
export ROBOTFORGE_API_KEY="your-api-key"
export ROBOTFORGE_ENDPOINT="https://api.robotforge.com.ng"
export ROBOTFORGE_PROJECT_ID="my-project"
export ROBOTFORGE_DEBUG="true"# SDK will automatically use environment variables
client = TelemetryClient()from telemetry_sdk.client import TelemetryClient
import openai
class AIChatbot:
def __init__(self, api_key: str, forge_key: str):
self.openai = openai.AsyncOpenAI(api_key=api_key)
self.forge = TelemetryClient(api_key=forge_key)
async def chat(self, user_message: str, conversation_history: list) -> str:
# Trace the model call
async with self.forge.trace_model_call(
provider="openai",
model="gpt-4-turbo",
temperature=0.7
) as span:
# Prepare the prompt
messages = conversation_history + [
{"role": "user", "content": user_message}
]
span.set_input(str(messages))
# Call the model
response = await self.openai.chat.completions.create(
model="gpt-4-turbo",
messages=messages,
temperature=0.7
)
# Extract response
assistant_message = response.choices[0].message.content
# Record telemetry
span.set_output(assistant_message)
span.set_tokens(response.usage.total_tokens)
span.set_metadata("conversation_length", len(messages))
return assistant_message
# Usage
bot = AIChatbot(
api_key="openai_key",
forge_key="forge_key"
)
response = await bot.chat(
"What's the weather like?",
conversation_history=[]
)from telemetry_sdk.client import TelemetryClient
import aiohttp
class AIAgent:
def __init__(self, forge_key: str):
self.forge = TelemetryClient(api_key=forge_key)
async def research_topic(self, topic: str) -> dict:
# Step 1: Search the web
search_results = await self._web_search(topic)
# Step 2: Analyze results with LLM
analysis = await self._analyze_results(topic, search_results)
return {
"topic": topic,
"search_results": search_results,
"analysis": analysis
}
async def _web_search(self, query: str) -> list:
async with self.forge.trace_tool_execution(
tool_name="web_search",
action="search",
endpoint="https://api.search.com/v1/search"
) as span:
span.set_input(query)
async with aiohttp.ClientSession() as session:
async with session.get(
"https://api.search.com/v1/search",
params={"q": query}
) as response:
results = await response.json()
span.set_output(str(results))
span.set_http_status_code(response.status)
return results
async def _analyze_results(self, topic: str, results: list) -> str:
async with self.forge.trace_model_call(
provider="anthropic",
model="claude-3-opus"
) as span:
prompt = f"Analyze these search results about {topic}: {results}"
span.set_input(prompt)
# Call Claude
analysis = await call_claude(prompt)
span.set_output(analysis)
span.set_metadata("num_results", len(results))
return analysis
# Usage
agent = AIAgent(forge_key="forge_key")
report = await agent.research_topic("quantum computing")from telemetry_sdk.client import TelemetryClient
import asyncio
async def process_documents(documents: list[str], forge_key: str):
client = TelemetryClient(api_key=forge_key)
async def process_one(doc: str):
async with client.trace_model_call(
provider="openai",
model="gpt-4"
) as span:
span.set_input(doc)
span.set_metadata("doc_length", len(doc))
summary = await summarize(doc)
span.set_output(summary)
span.set_metadata("summary_length", len(summary))
return summary
# Process all documents in parallel
summaries = await asyncio.gather(*[
process_one(doc) for doc in documents
])
# Ensure all events are sent
await client.flush()
return summaries
# Usage
docs = [doc1, doc2, doc3, ...]
summaries = await process_documents(docs, "forge_key")async def main():
client = TelemetryClient(api_key="key")
try:
# Your application logic
await do_work(client)
finally:
# Ensure all events are sent
await client.flush()
asyncio.run(main())Context managers automatically handle timing and error tracking:
# ✅ Good - Automatic timing and error handling
async with client.trace_model_call(...) as span:
result = await call_model()
span.set_output(result)
# ❌ Avoid - Manual event building is error-prone
builder = client.create_model_call_event()
start = time.time()
result = await call_model()
builder.set_latency_ms(int((time.time() - start) * 1000))
await builder.send()async with client.trace_model_call(...) as span:
span.set_input(prompt)
# Add context that will help with debugging
span.set_metadata("user_tier", user.tier)
span.set_metadata("feature_version", "v2")
span.set_metadata("ab_test_variant", "control")
response = await call_model(prompt)
span.set_output(response)async with client.trace_model_call(...) as span:
try:
span.set_input(prompt)
response = await call_model(prompt)
span.set_output(response)
except RateLimitError as e:
span.set_metadata("error", "rate_limit")
span.set_metadata("retry_after", e.retry_after)
raise
except Exception as e:
span.set_metadata("error", str(e))
raise# ✅ Good - Clean and reusable
@client.trace_model_call_decorator(provider="openai", model="gpt-4")
async def generate_text(prompt: str) -> str:
return await call_openai(prompt)
# ❌ Avoid - Repeated tracing code
async def generate_text(prompt: str) -> str:
async with client.trace_model_call(...) as span:
span.set_input(prompt)
result = await call_openai(prompt)
span.set_output(result)
return resultWe're actively developing additional features to make the RobotForge SDK even more powerful:
- Agent Action Tracing: Full support for tracing complex agent behaviors, planning, and reasoning
- Multi-Agent Coordination: Track interactions between multiple AI agents
- Decision Trees: Visualize agent decision-making processes
- Zero-Code Setup: Automatically instrument popular AI frameworks
- Supported Libraries:
- OpenAI (GPT models)
- Anthropic (Claude models)
- LangChain
- LlamaIndex
- Hugging Face Transformers
- Drop-in Integration: Single line of code to enable comprehensive tracing
- Vector Databases: Pinecone, Weaviate, Qdrant
- Frameworks: FastAPI, Flask, Django middleware
- Observability Platforms: OpenTelemetry, Datadog, New Relic
- Cloud Platforms: AWS Bedrock, Google Vertex AI, Azure OpenAI
We'll be publishing our detailed roadmap soon with dates, features, and opportunities for community input. Stay tuned to:
- GitHub: github.com/robotforge/python-sdk
- Documentation: docs.robotforge.com.ng
- Blog: robotforge.com.ng/blog
Want to influence our roadmap? Join our community and share your feedback!
Problem: Events aren't showing up in your dashboard.
Solutions:
- Check that you're calling
await client.flush()before your application exits - Verify your API key and endpoint are correct
- Enable debug mode:
TelemetryClient(api_key="key", debug=True) - Check for network connectivity issues
# Enable debugging
client = TelemetryClient(
api_key="key",
debug=True # Prints detailed logs
)Problem: Adding telemetry is slowing down your application.
Solutions:
- Increase batch size to reduce network calls:
client = TelemetryClient( api_key="key", batch_size=50, # Send 50 events at once flush_interval=10.0 # Flush every 10 seconds )
- Use async context managers for better performance
- Consider sampling in high-traffic scenarios (sample 10% of requests)
Problem: Memory usage is growing over time.
Solutions:
- Ensure you're calling
flush()regularly - Reduce batch size for long-running applications
- Avoid storing large objects in metadata
# For long-running services
async def periodic_flush(client):
while True:
await asyncio.sleep(60) # Flush every minute
await client.flush()
# Run in background
asyncio.create_task(periodic_flush(client))class RobotForge:
def __init__(
self,
api_key: str,
endpoint: str = "https://api.robotforge.com.ng",
project_id: str = None,
tenant_id: str = None,
application_id: str = None,
session_id: str = None,
user_id: str = None,
batch_size: int = 10,
flush_interval: float = 5.0,
max_retries: int = 3,
debug: bool = False
)Async:
trace_model_call(**kwargs)→ AsyncContextManager[Span]trace_tool_execution(tool_name: str, **kwargs)→ AsyncContextManager[Span]
Sync:
trace_model_call_sync(**kwargs)→ ContextManager[Span]trace_tool_execution_sync(tool_name: str, **kwargs)→ ContextManager[Span]
trace_model_call_decorator(**kwargs)→ Callabletrace_tool_execution_decorator(tool_name: str, **kwargs)→ Callable
Common:
set_input(text: str)- Set input dataset_output(text: str)- Set output dataset_metadata(key: str, value: Any)- Add custom metadataset_status(status: EventStatus)- Set event status
Model Call Specific:
set_provider(name: str)- Set model providerset_model(name: str)- Set model nameset_tokens(count: int)- Set token countset_cost(amount: float)- Set costset_temperature(temp: float)- Set temperature
Tool Execution Specific:
set_action(name: str)- Set action nameset_endpoint(url: str)- Set API endpointset_http_method(method: str)- Set HTTP methodset_http_status_code(code: int)- Set status codeset_request_payload(data: dict)- Set request dataset_response_payload(data: dict)- Set response data
flush() → None- Send all pending events immediatelyclose() → None- Close client and cleanup resources
- Full Docs: docs.robotforge.com.ng
- API Reference: docs.robotforge.com.ng/api
- Examples: github.com/robotforge/examples
- GitHub Issues: github.com/robotforge/python-sdk/issues
- Discord: discord.gg/robotforge
- Stack Overflow: Tag your questions with
robotforge
For enterprise support, SLAs, and custom solutions:
- Email: enterprise@robotforge.com.ng
- Schedule a Call: robotforge.com.ng/contact
The RobotForge Python SDK is released under the MIT License. See LICENSE file for details.
Built with ❤️ by the RobotForge team. Special thanks to our early adopters and community contributors who helped shape this SDK.
Version 1.0.0 | Last Updated: 29 October 2025