-
Notifications
You must be signed in to change notification settings - Fork 0
CodeExamples
Complete, runnable examples for every public method on Core and Context.
Each example focuses on one method or concern — combine them freely. For config
field details, see Configuration; for type definitions, see
Type Hints.
Sections:
- Initialization
- Creating a visitor context
- Experience evaluation
- Feature evaluation
- Segments
- Conversion tracking
- Queue control and flush
- Lifecycle event subscription
- Diagnostics
- Graceful shutdown
from convert_sdk import Core, SDKConfig
core = Core(
SDKConfig(
data={
"account_id": "1001",
"project": {"id": "2002", "name": "Demo"},
"experiences": [],
"features": [],
"goals": [],
}
)
).initialize()
assert core.is_ready
core.close()Read the key from the environment — never hard-code credentials:
import os
from convert_sdk import Core, SDKConfig, TransportConfig
core = Core(
SDKConfig(
sdk_key=os.environ["CONVERT_SDK_KEY"],
environment="production",
transport=TransportConfig(timeout=5.0),
)
).initialize()
assert core.is_ready
core.close()Use the context-manager form so close() is always called, even on error:
from convert_sdk import Core, SDKConfig
with Core(SDKConfig(data={"account_id": "1001", "project": {"id": "2"}, "experiences": [], "features": [], "goals": []})).initialize() as core:
context = core.create_context("visitor-001")
result = context.run_experience("checkout-experiment")
# core.close() called automatically on exitimport os
from convert_sdk import Core, SDKConfig, RefreshConfig, LifecycleEvent
core = Core(
SDKConfig(
sdk_key=os.environ["CONVERT_SDK_KEY"],
refresh=RefreshConfig(
interval_seconds=300.0, # refresh every 5 minutes
jitter_seconds=30.0, # avoid thundering herd across instances
backoff_factor=2.0, # exponential backoff on transient failures
backoff_max_seconds=600.0, # never back off longer than 10 minutes
),
)
).initialize()
# Subscribe to config-swap events (e.g. to bust a downstream cache):
core.on(LifecycleEvent.CONFIG_UPDATED, lambda payload, error=None: print("config swapped"))
# Trigger an immediate out-of-band refresh (fire-and-forget):
core.refresh_now()
core.close() # stops the background refresh threadcore.current_config returns the current immutable ConfigSnapshot, or None
if not yet initialized. It is safe to call from any thread:
snapshot = core.current_config
if snapshot is not None:
print(snapshot.project_id, snapshot.account_id)core.create_context(visitor_id) returns a Context bound to the current
immutable snapshot. Attributes are copied defensively:
context = core.create_context(
"visitor-abc",
visitor_attributes={"country": "US", "plan": "pro"},
)
# Read stored attributes back:
print(context.visitor_id) # "visitor-abc"
print(dict(context.visitor_attributes)) # {"country": "US", "plan": "pro"}
print(dict(context.attributes)) # same as visitor_attributes (alias)
print(dict(context.default_segments)) # {} — segments are separateReuse the same Context within a request. Create a new one per request (or per
visitor interaction) to pick up the latest config.
set_attributes merges new keys into the stored state and persists through the
configured DataStore. Later create_context calls for the same visitor
rehydrate the merged state:
context.set_attributes({"plan": "enterprise", "beta": True})
# context.visitor_attributes now includes all prior keys plus the update.run_experience returns ExperienceResult | None. A None result is a normal,
non-exceptional outcome — audience miss, outside traffic allocation, or
experience not found:
from convert_sdk import ExperienceResult
result = context.run_experience("checkout-experiment")
if result is None:
pass # visitor did not qualify — not an error
else:
assert isinstance(result, ExperienceResult)
print(result.experience_key) # "checkout-experiment"
print(result.experience_id) # e.g. "100342"
print(result.variation_key) # e.g. "treatment" (may be None if unset in config)
print(result.variation_id) # e.g. "200512"
# result.variation is a read-only mapping of the raw variation configattributes= is an ephemeral overlay for a single call. It does not mutate the
context's stored visitor_attributes:
# Stored attribute: country="US"
result = context.run_experience(
"geo-experiment",
attributes={"country": "DE"}, # this call only; context unchanged
)
assert context.visitor_attributes["country"] == "US"run_experiences() evaluates every active experience in the config and returns
a list of results for those the visitor qualifies for. The list is empty (not
None) when no experience resolves:
results = context.run_experiences()
for r in results:
print(r.experience_key, "->", r.variation_key)Both run_experience and run_experiences accept location_attributes= as a
keyword-only argument for location-rule qualification:
result = context.run_experience(
"landing-page-test",
location_attributes={"url": "/pricing"},
)Bucketing a visitor enqueues a bucketing/activation event so the visitor is
counted in the experiment's exposure denominator. Pass enable_tracking=False
(keyword-only, default True) to evaluate without reporting it — bucketing,
sticky persistence, audience-rule evaluation, and the internal BUCKETING
lifecycle event still fire; only the outbound network enqueue is skipped (useful
for consent-denied flows or previewing a variation before applying it):
variation = context.run_experience("homepage-test", enable_tracking=False)
context.run_experiences(enable_tracking=False)Python's tracking control is per-call only — there is no global tracking switch. Leave
enable_trackingat itsTruedefault for production traffic, or your experiment's exposure count goes to zero and you lose the conversion-rate denominator.
run_feature resolves a feature flag and its typed variables from the visitor's
selected variation. It returns FeatureResult | None — None when the feature
is not declared, the visitor is not bucketed into any variation carrying the
feature, or the feature is otherwise unavailable:
from convert_sdk import FeatureStatus
feature = context.run_feature("checkout-banner")
if feature is None:
pass # feature unavailable or visitor not bucketed into it
else:
assert feature.status is FeatureStatus.ENABLED
print(feature.feature_key) # "checkout-banner"
print(feature.feature_id) # the feature's config id
print(feature.experience_key) # which experience supplied the change
print(feature.variation_key) # the selected variation's key
# Variables are type-cast from the feature's declared variable types:
headline = feature.variables.get("headline") # str
enabled = feature.variables.get("enabled") # bool
max_items = feature.variables.get("max_items") # intrun_features() resolves every declared feature the visitor buckets into,
returning a list (empty if none):
for f in context.run_features():
print(f.feature_key, f.status.value, dict(f.variables))Both run_feature and run_features accept attributes= and
location_attributes= overlays, identical to the experience methods.
Default segments feed reporting and conversion attribution. They are kept
strictly separate from visitor_attributes — do not mix them. Use
set_segments to associate default segments with the visitor:
context.set_segments({"customerType": "vip", "loyalty_tier": "gold"})
print(dict(context.default_segments))
# {"customerType": "vip", "loyalty_tier": "gold"}Like set_attributes, set_segments merges into the stored state and persists
through the DataStore. The next create_context for the same visitor
rehydrates both attributes and segments.
run_custom_segments evaluates named segment rules against the visitor and
records newly matched segment IDs in the visitor's default-segment state.
Returns a typed CustomSegmentsResult — never None, never raises on a miss:
from convert_sdk import CustomSegmentsResult
# rule_data is an ephemeral per-call overlay — never written back to
# visitor_attributes:
result = context.run_custom_segments(
["us-visitors", "premium-plan"],
rule_data={"country": "US", "plan": "pro"},
)
assert isinstance(result, CustomSegmentsResult)
# result.matched_segment_ids: tuple[str, ...] — IDs newly matched THIS call
# result.matched: bool convenience property
print(result.matched_segment_ids) # e.g. ("seg_us_abc",)
print(result.matched) # True
# A no-match is a typed empty result, not an exception:
miss = context.run_custom_segments(["us-visitors"], rule_data={"country": "DE"})
assert miss.matched is False
assert miss.matched_segment_ids == ()Already-matched segment IDs are not re-added on subsequent calls.
track_conversion is synchronous and lightweight — it resolves the goal,
deduplicates, and appends to the in-process queue. No network call happens.
It always returns a typed ConversionResult:
from convert_sdk import ConversionStatus
result = context.track_conversion("purchase_completed")
# Inspect the outcome without exceptions:
if result.status is ConversionStatus.QUEUED:
print("queued:", result.tracked) # True
elif result.status is ConversionStatus.DEDUPLICATED:
print("duplicate:", result.reason) # "deduplicated"
elif result.status is ConversionStatus.GOAL_NOT_FOUND:
print("missing:", result.reason) # "goal_not_found"ConversionStatus |
tracked |
reason |
Meaning |
|---|---|---|---|
QUEUED |
True |
None |
Goal resolved; event enqueued. |
DEDUPLICATED |
False |
"deduplicated" |
Same (visitor, goal) already tracked; no second event. |
GOAL_NOT_FOUND |
False |
"goal_not_found" |
Goal key absent from loaded config; typed non-exception outcome. |
result = context.track_conversion(
"purchase_completed",
revenue=49.99,
)
assert result.tracked is Trueconversion_data values must be JSON primitives (int, float, str).
Objects, lists, and booleans raise ConversionDataError (programmer misuse):
result = context.track_conversion(
"purchase_completed",
revenue=149.00,
conversion_data={
"transaction_id": "txn-abc-123",
"items_count": 3,
},
)By default, deduplication is keyed by (visitor_id, goal_id) — a differing
revenue or conversion_data does not defeat it. Use force_multiple=True
to re-track an already-tracked goal (e.g. recording multiple revenue
transactions in one session):
# First purchase:
context.track_conversion("purchase_completed", revenue=49.99)
# Second purchase in the same session:
again = context.track_conversion(
"purchase_completed",
revenue=29.99,
conversion_data={"transaction_id": "txn-2"},
force_multiple=True, # overrides default-mode dedup
)
assert again.tracked is TrueA tracked conversion carries the visitor's attribution context at conversion
time: active default segments (set_segments) and active variation/bucketing
assignments. Call set_segments and run_experience/run_feature before
track_conversion so the conversion is attributed correctly.
track_conversion never sends to the network — it appends to an in-process
batch queue. Use core.flush() to deliver queued events explicitly:
context.track_conversion("purchase_completed", revenue=49.99)
context.track_conversion("add_to_cart")
core.flush() # delivers all queued events and clears the queue
core.flush() # safe no-op on an empty queue — no transport call, no errorSet batch_size so the queue self-releases when it fills. This bounds
in-process memory and latency without per-call flush() calls:
from convert_sdk import Core, SDKConfig
core = Core(SDKConfig(data=project_config, batch_size=25)).initialize()
# The queue releases automatically once 25 events accumulate.auto_flush_interval_ms starts a daemonic timer for long-lived server
processes. Leave as None for Lambda, cron jobs, and other short-lived
runtimes:
from convert_sdk import Core, SDKConfig
# Flush every 5 seconds in a long-lived background worker:
core = Core(SDKConfig(data=project_config, auto_flush_interval_ms=5000)).initialize()| Runtime | Recommended flush point |
|---|---|
| Django / Flask (WSGI) | Response middleware process_response() hook |
| FastAPI / Starlette (ASGI) | Background task or finally in the route handler |
| AWS Lambda | End of handler, before return
|
| CLI / script |
finally block after main logic |
| Long-running server | Context manager (with Core(...).initialize() as core:) |
Register handlers with core.on(event, handler). Handlers receive
(payload, error=None) — error is non-None only when the release failed.
Handlers that raise are isolated and logged; they never affect evaluation or
delivery. Safe to call before initialize():
from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload, ConversionEventPayload
def on_queue_released(payload: QueueReleasedPayload, error=None) -> None:
if error is not None:
print(f"delivery failed: status={payload.status_code} retries={payload.retry_attempts}")
else:
print(
f"delivered batch: reason={payload.reason} "
f"events={payload.event_count} visitors={payload.visitor_count}"
)
def on_conversion(payload: ConversionEventPayload, error=None) -> None:
print(f"conversion queued: visitor={payload.visitor_id} goal={payload.goal_key}")
def on_config_updated(payload, error=None) -> None:
print(f"config swapped: project={payload['project_id']} entities={payload['entity_counts']}")
core.on(LifecycleEvent.API_QUEUE_RELEASED, on_queue_released)
core.on(LifecycleEvent.CONVERSION, on_conversion)
core.on(LifecycleEvent.CONFIG_UPDATED, on_config_updated)| Event | When fired | Payload type |
|---|---|---|
LifecycleEvent.READY |
initialize() succeeds |
internal |
LifecycleEvent.CONFIG_UPDATED |
Background refresh swaps in a new snapshot, or on first successful init |
dict with account_id, project_id, entity_counts
|
LifecycleEvent.CONVERSION |
A conversion event is enqueued (not on dedup or miss) | ConversionEventPayload |
LifecycleEvent.API_QUEUE_RELEASED |
The tracking queue is released (success or delivery failure) | QueueReleasedPayload |
LifecycleEvent.DATA_STORE_QUEUE_RELEASED |
(JS parity; out of MVP scope) | — |
QueueReleasedPayload fields:
| Field | Type | Meaning |
|---|---|---|
reason |
ReleaseReason |
Why the release fired: EXPLICIT, BATCH_SIZE, TIMER, or ATEXIT
|
batch_size |
int |
Number of events delivered in this batch |
visitor_count |
int |
Distinct visitor IDs in this batch |
event_count |
int |
Total event records delivered |
status_code |
int | None |
HTTP status on failure; None on success |
retry_attempts |
int | None |
Exhausted retry count; None or 0 if the adapter does not retry |
Import QueueReleasedPayload and ConversionEventPayload from
convert_sdk.events.
When you need to know why an experience or feature did not resolve — without
raising — use the diagnose_* methods. They return typed diagnostic objects
and never raise on a normal miss:
from convert_sdk import DiagnosticReason
# Experience diagnostic:
diag = context.diagnose_experience("checkout-experiment")
print(diag.resolved) # bool — True only when RESOLVED
print(diag.reason) # DiagnosticReason enum, e.g. AUDIENCE_MISMATCH
print(diag.reason.value) # str wire value, e.g. "audience_mismatch"
print(diag.message) # human-readable description
print(dict(diag.details)) # allowlist-safe detail map# Feature — RESOLVED, FEATURE_NOT_FOUND, or FEATURE_NOT_IN_SELECTED_VARIATIONS:
feat_diag = context.diagnose_feature("checkout-banner")
# Goal — RESOLVED or GOAL_NOT_FOUND:
goal_diag = context.diagnose_goal("purchase_completed")
# Config-entity lookup — RESOLVED, ENTITY_NOT_FOUND, or PROJECT_MAPPING_REQUIRED:
entity_diag = context.diagnose_entity("experience", "checkout-experiment")| Value | String | Produced by |
|---|---|---|
RESOLVED |
"resolved" |
All diagnose_* — visitor qualified and the entity resolved |
AUDIENCE_MISMATCH |
"audience_mismatch" |
diagnose_experience — experience exists but visitor did not qualify |
EXPERIENCE_NOT_FOUND |
"experience_not_found" |
diagnose_experience — key not in config |
FEATURE_NOT_FOUND |
"feature_not_found" |
diagnose_feature — key not in config |
FEATURE_NOT_IN_SELECTED_VARIATIONS |
"feature_not_in_selected_variations" |
diagnose_feature — feature declared but visitor's variation does not carry it |
GOAL_NOT_FOUND |
"goal_not_found" |
diagnose_goal — key not in config |
ENTITY_NOT_FOUND |
"entity_not_found" |
diagnose_entity — key/id not found or wrong entity_type
|
PROJECT_MAPPING_REQUIRED |
"project_mapping_required" |
diagnose_entity — loaded config has no project mapping |
Miss-path reasons are also emitted as structured log records through the
diagnostic logger ("convert_sdk" namespace). See Diagnostics
for the full log-record field reference.
core.close() stops the background refresh thread (if running), cancels the
periodic-flush timer (if set), and closes the httpx transport if Core
created it. It does not perform a final flush:
# Explicit call:
core.flush() # deliver any remaining queued events first
core.close() # then release resources
# Or use the context-manager form (close is called automatically on exit):
with Core(SDKConfig(data=project_config)).initialize() as core:
...close() is idempotent. Calling it multiple times is safe.
- Configuration — every option of every config dataclass
- Type Hints — dataclasses, Protocols, and enums for every result type
- Diagnostics — structured log fields, error codes, support workflow
-
Extending — custom
Transport,DataStore, event-bus observers - Running Experiences — detailed experience-evaluation guide
- Running Features — feature-flag resolution and typed variables
- Tracking Conversions — deduplication, attribution, payload reference
Copyrights © 2025 All Rights Reserved by Convert Insights, Inc.
Getting Started
Python SDK
- Quickstart
- Installation
- Initialization
- Configuration
- Code Examples
- Type Hints
- Diagnostics
- Extending
- Testing
- Async & Frameworks
Migration
Core Concepts
- Experiences & Variations
- Feature Flags
- Bucketing Algorithm
- Rule Evaluation
- Segments
- Data Management
- Event System
- API Communication
How-To Guides
- Running Experiences
- Running Features
- Tracking Conversions
- Visitor Context
- Persistent DataStore
- Troubleshooting
Edge & Integrations
Maintainers