-
Notifications
You must be signed in to change notification settings - Fork 0
Extending
The SDK is built around three extension points, each defined as a
@runtime_checkable Protocol in src/convert_sdk/ports/. You supply custom
implementations through Core's constructor keyword argument (transport=) or
through SDKConfig (data_store=). There is no EventBus injection —
the SDK owns the bus; you observe it through Core.on(...).
Implement the Transport protocol to replace how the SDK fetches config and
delivers tracking events. Common reasons: mTLS, proxy routing, stubbing in
integration tests, or logging delivery calls.
from typing import Any, Dict
from convert_sdk import Core, SDKConfig
from convert_sdk.ports.transport import Transport
class StubTransport:
"""Test transport that returns canned responses."""
def __init__(self, config_payload: Dict[str, Any]) -> None:
self._config = config_payload
self.tracking_calls = []
def fetch_config(self, config) -> Dict[str, Any]:
return self._config
def send_tracking(self, payload: Dict[str, Any], *, sdk_key: str) -> None:
self.tracking_calls.append(payload)
def close(self) -> None:
pass
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
return False
stub = StubTransport(config_payload={
"account_id": "1001",
"project": {"id": "2002", "name": "Test"},
"experiences": [],
"features": [],
"goals": [],
})
# transport= is KEYWORD-ONLY on Core
core = Core(
SDKConfig(sdk_key="test-key"),
transport=stub,
).initialize()The Transport protocol is structural and @runtime_checkable — your class just
needs to implement fetch_config(config), send_tracking(payload, *, sdk_key),
close(), and the __enter__/__exit__ pair. No base class required.
| Method | Signature | Description |
|---|---|---|
fetch_config |
(config: SDKConfig) -> dict |
Fetch and return the raw config payload |
send_tracking |
(payload: dict, *, sdk_key: str) -> None |
Deliver a serialized tracking batch |
close |
() -> None |
Release any held resources |
Implement DataStore to persist visitor state and goal dedup records across
process restarts (e.g. Redis, Postgres, Memcache). Inject it via
SDKConfig(data_store=my_store) — not as a Core argument.
The required surface is exactly four methods: get, set, has, delete.
import json
from convert_sdk import Core, SDKConfig, DataStore
from convert_sdk.ports.storage import DataStore, visitor_state_key
class RedisDataStore:
"""Example: Redis-backed visitor state store."""
def __init__(self, redis_client) -> None:
self._redis = redis_client
def get(self, key: str):
raw = self._redis.get(key)
if raw is None:
return None
return json.loads(raw)
def set(self, key: str, value, ttl=None) -> None:
if ttl is not None:
self._redis.setex(key, ttl, json.dumps(value))
else:
self._redis.set(key, json.dumps(value))
def has(self, key: str) -> bool:
return bool(self._redis.exists(key))
def delete(self, key: str) -> None:
self._redis.delete(key)
# data_store is a CONFIG FIELD, not a Core argument
core = Core(
SDKConfig(data=project_config, data_store=RedisDataStore(redis_client=my_redis)),
).initialize()The built-in InMemoryDataStore is thread-safe but process-local. Goal
deduplication state resets on process restart with the default store.
| Method | Signature | Description |
|---|---|---|
get |
(key: str) -> Any |
Return stored value or None if absent/expired |
set |
(key: str, value: Any, ttl: int | None = None) -> None |
Store value; ttl is expiry in seconds |
has |
(key: str) -> bool |
Return True if key has a present, unexpired value |
delete |
(key: str) -> None |
Remove key; no-op if absent |
For background on when the data store is read and written, see the shared Persistent DataStore guide.
The SDK uses Python's stdlib logging module throughout. There is no separate
logging Protocol — logging is the stdlib seam. Pass your own logging.Logger
via SDKConfig.logger:
import logging
from convert_sdk import Core, SDKConfig
my_logger = logging.getLogger("myapp.convert")
core = Core(SDKConfig(data=project_config, logger=my_logger)).initialize()The SDK will emit through your logger. It never adds handlers, sets levels, or
calls logging.basicConfig().
# silence all SDK output in production
logging.getLogger("convert_sdk").setLevel(logging.WARNING)You do not implement the EventBus Protocol directly — the SDK owns the bus.
Observe it through Core.on, registering handlers for LifecycleEvent values.
Handlers receive (payload, error=None):
from convert_sdk import LifecycleEvent
from convert_sdk.events import QueueReleasedPayload
def on_queue_released(payload: QueueReleasedPayload, error=None) -> None:
if error is not None:
print(f"delivery failed: {error}")
else:
print(f"released {payload.batch_size} events")
core.on(LifecycleEvent.API_QUEUE_RELEASED, on_queue_released)A raising handler is isolated, logged, and swallowed — it can never break tracking or delivery.
The most common use of the extension points is in tests: supply a stub transport
with canned config payloads to avoid network calls, and use a fresh
InMemoryDataStore per test to isolate deduplication state:
from convert_sdk import Core, SDKConfig, InMemoryDataStore
class CannedTransport:
def __init__(self, payload):
self._payload = payload
self.tracking_calls = []
def fetch_config(self, config):
return self._payload
def send_tracking(self, payload, *, sdk_key):
self.tracking_calls.append(payload)
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, *exc):
return False
def make_test_core(config_payload):
return Core(
SDKConfig(sdk_key="test-key"),
transport=CannedTransport(config_payload),
).initialize()-
Initialization —
SDKConfig,TransportConfig,RefreshConfig - Code Examples — Queue Control — lifecycle events as an alternative to subclassing
- Diagnostics — diagnostic logging configuration
- Async and Framework Integrations — how the same Protocol surfaces extend to the planned async API
Copyrights © 2025 All Rights Reserved by Convert Insights, Inc.
Getting Started
Python SDK
- Quickstart
- Installation
- Initialization
- Configuration
- Code Examples
- Type Hints
- Diagnostics
- Extending
- Testing
- Async & Frameworks
Migration
Core Concepts
- Experiences & Variations
- Feature Flags
- Bucketing Algorithm
- Rule Evaluation
- Segments
- Data Management
- Event System
- API Communication
How-To Guides
- Running Experiences
- Running Features
- Tracking Conversions
- Visitor Context
- Persistent DataStore
- Troubleshooting
Edge & Integrations
Maintainers