Skip to content

CodeExamples

Usman Abbas edited this page May 5, 2026 · 4 revisions

Code Examples

Complete, copy-paste examples for every public method on Core and Context. For the underlying types, see Type Hints. For init options, see Configuration.

Sections on this page:

  1. Initialization
  2. Visitor Context
  3. Evaluationrun_experience, run_experiences, run_feature, run_features
  4. Segments
  5. Trackingtrack_conversion, deduplication, revenue
  6. Queue Controlrelease_queues, lifecycle events, batching
  7. Diagnosticsdiagnose_* for non-raising lookups

Initialization

SDK key mode (network fetch)

import os
from convert_sdk import Core, SDKConfig, TransportConfig

core = Core(
    SDKConfig(
        sdk_key=os.environ["CONVERT_SDK_KEY"],
        environment="production",
        transport=TransportConfig(
            timeout_seconds=5.0,
        ),
    )
)
assert core.is_ready

Direct config mode (no network)

from convert_sdk import Core, SDKConfig

core = Core(SDKConfig(config_data={
    "account_id": "1001",
    "project": {"id": "2002", "name": "Demo"},
    "experiences": [],
    "features": [],
    "goals": [],
}))

Long-running service with auto-refresh

from convert_sdk import Core, SDKConfig, LifecycleEvent
from convert_sdk.config import RefreshConfig

core = Core(
    SDKConfig(
        sdk_key=os.environ["CONVERT_SDK_KEY"],
        refresh=RefreshConfig(
            interval_seconds=300.0,
            jitter_seconds=30.0,
            backoff_initial_seconds=30.0,
            backoff_factor=2.0,
            backoff_max_seconds=600.0,
        ),
    )
)

core.on(LifecycleEvent.CONFIG_UPDATED, lambda payload: my_cache.invalidate())

# Force a refresh on demand:
core.refresh_now()                       # fire-and-forget
ok = core.refresh_now(wait=True)         # block until the next attempt finishes

# Graceful shutdown:
core.close()

Visitor Context

context = core.create_context(
    "visitor-abc123",
    {"tier": "premium", "country": "US"},
)

When visitor_attributes is supplied to create_context(), the stored attributes for that visitor are replaced (not merged) — only visitor_properties and default_segments carry over from any previously stored state. To merge new attributes onto existing ones, omit the second argument from create_context() and call:

context.update_visitor_attributes({"new_attr": "value"})           # merges
context.update_visitor_attributes({"new_attr": "value"}, replace=True)  # wipes and overwrites

Context is reusable — call multiple evaluation methods on the same instance within a request lifecycle.

Evaluation

Single experience

result = context.run_experience(
    "checkout-flow",
    location_attributes={"path": "/checkout"},
)

if result is None:
    # visitor did not qualify (audience miss, location miss, or outside traffic)
    pass
else:
    print(result.experience_key, result.variation_key, result.bucket_value)

run_experience() returns ExperienceResult | None. A None result is a normal non-exceptional outcome — see Type Hints — ExperienceResult.

All applicable experiences

results = context.run_experiences(
    location_attributes={"path": "/checkout"},
)
# results: list[ExperienceResult], empty list when no match
for r in results:
    print(r.experience_key, r.variation_key)

Single feature

from convert_sdk import FeatureStatus

feature = context.run_feature(
    "checkout-banner",
    location_attributes={"path": "/checkout"},
)

if feature is None:
    pass  # feature disabled or visitor not in any backing experience
elif feature.status == FeatureStatus.ENABLED:
    title = feature.variables.get("title")
    print(title)

Variable type casting

Variables are type-cast based on the type field declared in the config. The supported types are boolean, integer, float, string, and json. Pass type_cast=False to run_feature() to skip the cast and receive each variable as it appears in the config snapshot:

feature = context.run_feature("checkout-banner", type_cast=False)

All applicable features

features = context.run_features(
    location_attributes={"path": "/checkout"},
)
for f in features:
    print(f.feature_key, f.status.value, dict(f.variables))

Per-evaluation visitor attribute overrides

result = context.run_experience(
    "beta-program",
    visitor_attributes={"beta_opt_in": True},
)

Overrides are not persisted; they apply only to that single evaluation call.

Segments

Default segments

Default segments are keys of named segment entities from the config. They are carried with every context and applied automatically during evaluation.

context.set_default_segments(["segment-premium-eu"])

Custom segment evaluation

matched = context.run_custom_segments(
    ["segment-premium-eu", "segment-mobile"],
    rule_data={"device": "mobile"},
)
# matched: tuple[str, ...] of the keys whose rules were satisfied

Tracking

Basic conversion

result = context.track_conversion("purchase")

print(result.queued_event_count)    # number of events queued
print(result.duplicate_prevented)   # True if deduplication blocked the event

Conversion with revenue data

result = context.track_conversion(
    "purchase",
    conversion_data={
        "revenue": 49.99,
        "products_count": 2,
        "order_id": "ORD-7842",
    },
)

Value types accepted in conversion_data:

Accepted Not accepted
int, float, str bool
tuple[str, ...] / list[str] (tags) dict, bytes, other sequences

Booleans and non-string sequence items raise ConversionDataError.

When conversion_data is supplied, the SDK creates two events per conversion: one base conversion event (no data, for goal attribution) and one revenue/data event (carries the conversion_data).

Allowing repeat transactions

result = context.track_conversion(
    "purchase",
    conversion_data={"revenue": 29.99},
    force_multiple_transactions=True,
)

With force_multiple_transactions=True, the base conversion event is still sent only once, but an additional transaction event is queued for each subsequent call that includes conversion_data.

Handling GoalNotFoundError

from convert_sdk import GoalNotFoundError

try:
    context.track_conversion("unknown-goal")
except GoalNotFoundError as exc:
    print(exc.code)     # "goal.not_found"
    print(exc.context)  # {"reason": "goal_not_found", "available_goal_count": N}

Queue Control

The SDK queues conversion events in-process and delivers them in HTTP POST batches only when you explicitly call release_queues().

Explicit flush

flush_result = context.release_queues(reason="end_of_request")

if flush_result.attempted:
    print(f"Delivered {flush_result.delivered_event_count} events "
          f"in {flush_result.delivered_batch_count} batches")
else:
    print("Queue was empty — nothing sent")

When to flush

Runtime Recommended flush point
Django / Flask (WSGI) Response middleware process_response() hook
FastAPI / Starlette (ASGI) Background task or response middleware
AWS Lambda End of handler before return
CLI / script finally block after main logic
Long-running service Periodic background thread + shutdown hook
# Django middleware example
class ConvertFlushMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        response = self.get_response(request)
        context = getattr(request, "convert_context", None)
        if context is not None:
            context.release_queues(reason="response_sent")
        return response

Batch size

from convert_sdk import Core, SDKConfig, TrackingConfig

core = Core(
    SDKConfig(
        config_data=project_config,
        tracking=TrackingConfig(batch_size=25),
    )
)

Lifecycle events

Subscribe to lifecycle events on the Core instance to observe queue activity without modifying SDK internals:

from convert_sdk import LifecycleEvent, LifecycleEventPayload

def on_event_queued(payload: LifecycleEventPayload) -> None:
    print(payload.event.value, payload.details)

def on_released(payload: LifecycleEventPayload) -> None:
    print("delivered", payload.details.get("delivered_event_count"))

def on_delivery_failed(payload: LifecycleEventPayload) -> None:
    print("DELIVERY FAILURE", payload.details.get("error_type"))

core.on(LifecycleEvent.TRACKING_EVENT_QUEUED, on_event_queued)
core.on(LifecycleEvent.QUEUE_RELEASED, on_released)
core.on(LifecycleEvent.TRACKING_DELIVERY_FAILED, on_delivery_failed)

Unsubscribe with core.off(event, handler).

Event Fired when
TRACKING_EVENT_QUEUED Events are added to the queue by track_conversion()
QUEUE_RELEASE_STARTED release_queues() begins draining a non-empty queue
QUEUE_RELEASED All batches delivered successfully
TRACKING_DELIVERY_FAILED An HTTP transport error interrupted delivery
CONVERSION_CREATED A conversion result is built (before enqueue)
CONVERSION_DEDUPLICATED Deduplication prevented a duplicate conversion event
CONFIG_UPDATED Background refresh swapped in a new snapshot

Delivery-failure handling

import httpx
from convert_sdk import TrackingError

try:
    context.release_queues(reason="end_of_request")
except (httpx.HTTPError, TrackingError):
    # The TRACKING_DELIVERY_FAILED handler already logged.
    # Events remain in the queue for the next flush.
    # Catch only transport-shaped failures here — programming errors
    # (TypeError, ValueError, etc.) should still surface so they get fixed.
    pass

Diagnostics

When you need to know why a visitor wasn't bucketed (or a goal wasn't found), use the diagnose_* variants. They return the full decision record without raising.

diag = context.diagnose_experience("checkout-flow")
print(diag.resolved)    # bool
print(diag.reason)      # "audience_miss" / "outside_traffic" / "bucketed" / ...
print(diag.message)     # human-readable description
print(diag.result)      # ExperienceResult | None
print(dict(diag.details))

feat_diag = context.diagnose_feature("checkout-banner")
goal_diag = context.diagnose_goal("purchase")
entity_diag = context.diagnose_config_entity("experience", "checkout-flow")

The full reason-code reference and field-by-field breakdown live in Diagnostics and Type Hints.

Graceful shutdown

# Application shutdown hook
core.close()  # stops the refresher thread; idempotent

# Or use the context-manager form:
with Core(SDKConfig(...)) as core:
    context = core.create_context("visitor-1")
    ...
# core.close() called automatically on exit

What to read next

  • Configuration — every option of every config dataclass
  • Type Hints — dataclasses, Protocols, and enums
  • Diagnostics — log structure, error codes, support workflow
  • Extending — custom transport, storage, and event-bus

Clone this wiki locally