This system synchronises records from our internal system to various external systems CRM providers (System B). It focuses on the Internal -> External direction – consuming internal change events, transforming them, and applying the corresponding CRUD operations on the external API respecting rate limits.
- Internal system only emits events for genuine changes
- Single record / record_type needs syncing to/from a single external system (Although this can be extended to sync to multiple systems but not for external to internal sync).
- Events are at record level, single event per record.
- Eventual consistency is fine. Sys A and Sys B maybe temporarily be out of sync.
- Simple conflict resolution , last write wins.
- Unique and immutable record_id within out internal system.
-
Event Bus (moslt likely choice is kafka) (mocked) A persisten log stream with ordering gaurantees per record (in case of kafka we can create partitions based on (org_id, record_type) + record_id ) ensuring ordering within the partition.
-
Transformer Registry (mocked) Ideally the tranformation can be config based (orgs storing their transformations which are eventually stored in a config file per object for a particular crm provider) , if not provided a global tranformation can be applied.
-
Consumer Worker This the core processing unit. Recieves an event, checks idempotency, selects transformer, calls external API with retries, updates the state store. In case of failure after puts the item to DLQ.
-
Scheduler Reads events from a shared queue in batches, hashes the
internal_id, and routes them to the correctConsumerWorkerto maintain per‑record ordering. -
StateStore (mocked) Responsible for Maintaining the internal_id - external_id translation per provider (crm / external system)
-
IdempotencyStore (mocked) Ensures exactly once processing semantics in an at least once delivery system. key -> (org_id, record_id, record_type, version) OR event_id, incase of external to internal sync key -> (org_id, external_id, record_type)
-
CredentialStore (mocked) Stores and manages org-specific authentication credentials required to interact with external systems.
-
Rate Limit Cache (mocked) Tracks and enforces rate limits per tenant and provider to prevent throttling by external systems. Most likely a TOKEN BUCKET implemented in REDIS. This is more of a rate limit checker rather than rate limiter and should act as a fail OPEN when this is down. key -> (org_id, provider, api[OPTIONAL])
-
Dead Letter QUEUE (DLQ) (mocked) Ideally another kafka topic.
We use uv for dependency management and execution.
-
Install
uv(if not already)
curl -LsSf https://astral.sh/uv/install.sh | sh -
Create a virtual environment and install
uv sync -
Run the demo
uv run python -m sync_service.demo -
Run tests
uv run pytest tests/
- For 300M syncs / daily = 3,500 to 35000 (consider bursts) /sec, convert to async Python or preferrably Golang.
- more test coverage
- ConsumerE2I (external to internal) consumer implementation for sync changes coming via either webhook events or via routine sync (routine sync should get all updated records post last time it was synced)
