-
Notifications
You must be signed in to change notification settings - Fork 0
MigrationFromRest
This guide is for teams currently calling the Convert config and tracking
endpoints directly (plain HTTP with requests, httpx, curl, etc.) and
wanting to migrate to the SDK.
| Concern | Raw REST | SDK |
|---|---|---|
| Config fetch | HTTP GET on startup (or per-request) | Done once in Core(SDKConfig(...)).initialize()
|
| Bucketing | Implemented manually or not at all | Deterministic in-process, no request-time network call |
| Audience rules | Evaluated on the Convert platform | Evaluated locally from config snapshot |
| Conversion POST | Manual JSON construction and POST |
context.track_conversion() + core.flush()
|
| Batching | Manual or absent | Automatic batching with configurable batch_size on SDKConfig
|
| Deduplication | Manual per-visitor tracking | Built-in (visitor_id, goal_id) dedup via DataStore
|
| Retries | Manual | Re-call core.flush() after a transport failure |
Raw REST:
import os
import httpx
sdk_key = os.environ["CONVERT_SDK_KEY"]
response = httpx.get(
f"https://cdn-4.convertexperiments.com/config/{sdk_key}",
headers={"Accept": "application/json"},
)
response.raise_for_status()
project_config = response.json()SDK equivalent:
import os
from convert_sdk import Core, SDKConfig
core = Core(
SDKConfig(
sdk_key=os.environ["CONVERT_SDK_KEY"],
environment="production",
)
).initialize()
# core.current_config holds the parsed snapshot; no manual JSON parsing neededRaw REST (manual bucketing):
import mmh3
def bucket_visitor(experience_id, visitor_id, seed=9999):
hash_input = f"{experience_id}{visitor_id}"
hash_value = mmh3.hash(hash_input, seed) & 0xFFFFFFFF
return int((hash_value / 4294967296) * 10000)
def select_variation(variations, bucket_value):
# NOTE: traffic_allocation is a percentage (0-100); bucket_value is in
# 0-9999. Multiply the allocation by 100 before comparing — the SDK does
# the same in evaluation/bucketing.py.
accumulated = 0.0
for variation in variations:
if variation.get("status") not in (None, "", "active", "running"):
continue
accumulated += float(variation["traffic_allocation"]) * 100
if bucket_value < accumulated:
return variation["id"]
return None
experience = next(
e for e in project_config["experiences"] if e["key"] == "checkout-flow"
)
bucket = bucket_visitor(experience["id"], "visitor-abc123")
variation_id = select_variation(experience["variations"], bucket)SDK equivalent:
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(data=project_config)).initialize()
context = core.create_context("visitor-abc123", visitor_attributes={"tier": "premium"})
result = context.run_experience("checkout-flow")
if result is not None:
variation_id = result.variation_id
# The SDK's internal bucket value is the same as the manual algorithm aboveNote: ExperienceResult does not currently expose bucket_value as a public
field — use diagnose_experience(key) to inspect bucketing details:
diag = context.diagnose_experience("checkout-flow")
print(diag.details) # includes bucket_value when bucketing was attemptedRaw REST:
import httpx, os
payload = {
"source": "python-sdk",
"enrichData": True,
"accountId": project_config["account_id"],
"projectId": project_config["project"]["id"],
"visitors": [
{
"visitorId": "visitor-abc123",
"events": [
{
"eventType": "conversion",
"data": {
"goalId": "goal-1",
"goalData": [{"key": "revenue", "value": 49.99}],
"bucketingData": {"exp-checkout": "var-treatment"},
},
}
],
}
],
}
sdk_key = os.environ["CONVERT_SDK_KEY"]
httpx.post(
f"https://metrics.convertexperiments.com/v1/track/{sdk_key}",
json=payload,
headers={"Content-Type": "application/json"},
)SDK equivalent:
from convert_sdk import ConversionStatus
result = context.track_conversion("purchase_completed", revenue=49.99)
if result.status is ConversionStatus.QUEUED:
# events are queued; deliver them:
core.flush()The SDK builds the payload automatically from the config snapshot. The
bucketingData field is populated from the visitor's active bucketing
assignments at the moment of track_conversion().
Raw REST integrations typically send all visitors to Convert for server-side bucketing or skip audience rules entirely. The SDK evaluates audience rules locally using the config snapshot, meaning:
- No extra network round-trip per request for bucketing decisions
- Audience rules are evaluated immediately, not deferred to the platform
The SDK accumulates events and sends them in configurable batches (default: 10
events per POST). Raw REST integrations typically POST one event per HTTP request.
Configure via SDKConfig(batch_size=25).
The SDK prevents double-counting the same goal for the same visitor within a
process lifetime. Raw REST integrations must implement deduplication manually.
Unknown goals return a typed ConversionResult with status == GOAL_NOT_FOUND
rather than raising an exception.
The SDK redacts visitor ids and SDK keys from all diagnostic logs. Raw REST integrations typically log raw credentials and ids.
If you already fetch the project config and parse it, you can pass it directly to the SDK without changing your fetching logic:
from convert_sdk import Core, SDKConfig
# your existing config fetch result
project_config = existing_fetch_function()
core = Core(SDKConfig(data=project_config, environment="production")).initialize()See Initialization for the full SDKConfig reference.
The MVP is sync-first. An async public API (AsyncCore / AsyncContext)
and framework-specific helpers (convert-sdk-django,
convert-sdk-fastapi, convert-sdk-flask) are planned for Phase 3.
If you are migrating from REST today and your service is already async
(asyncio / httpx.AsyncClient), you can call the sync SDK from
async code via asyncio.to_thread() until the async surface ships;
see Async and Framework Integrations for the design intent.
asyncio.to_thread()runs on the default event-loop thread executor, which on CPython isThreadPoolExecutorcapped atmin(32, os.cpu_count() + 4)workers by default. Under a FastAPI service receiving hundreds of concurrent requests, that cap can serialise SDK calls and produce bimodal latency. If you adopt theto_thread()bridge, raise the executor size at startup:import asyncio from concurrent.futures import ThreadPoolExecutor loop = asyncio.get_event_loop() loop.set_default_executor(ThreadPoolExecutor(max_workers=128))or move SDK calls to a dedicated executor sized for your concurrency.
- Initialization — SDK key and direct config options
- Code Examples — how the SDK replaces manual bucketing and POST construction
-
Diagnostics — log structure, error codes,
diagnose_*helpers
Copyrights © 2025 All Rights Reserved by Convert Insights, Inc.
Getting Started
Python SDK
- Quickstart
- Installation
- Initialization
- Configuration
- Code Examples
- Type Hints
- Diagnostics
- Extending
- Testing
- Async & Frameworks
Migration
Core Concepts
- Experiences & Variations
- Feature Flags
- Bucketing Algorithm
- Rule Evaluation
- Segments
- Data Management
- Event System
- API Communication
How-To Guides
- Running Experiences
- Running Features
- Tracking Conversions
- Visitor Context
- Persistent DataStore
- Troubleshooting
Edge & Integrations
Maintainers