Push Cache is a high-performance, in-memory caching service written in Rust. It consumes customer data from a Kafka topic (Avro formatted) and exposes it via a fast HTTP API. It is designed to be a sidecar or microservice that provides low-latency access to eventually consistent data.
It is not viable to use Istio to redirect traffic to the cache and then back to the original service on failure. It is also not sensible as that would risk the originating service from being hit with too much traffic that it cannot handle. This cache system should be used as the authorative endpoint for the data.
The service consists of two main components running concurrently:
- Kafka Consumer: Ingests
Customerupdates from a Kafka topic, deserializes Avro messages, and updates the in-memory cache. It handles "tombstone" records (null payload) by removing entries. - Web Service: An Axum-based HTTP server that serves the cached data to clients.
graph TD
K[Kafka Topic] -- Avro Messages --> C(Kafka Consumer)
SR[Schema Registry] -- Schema Validation --> C
C -- Insert/Update/Remove --> Router{Cache Router}
Router --> M[(In-Memory Cache)]
Router --> Redis[(Redis Cache)]
Client[HTTP Client] -- GET /api/* --> API(Web Service)
API -- Lookup --> Router
M -- Data --> API
Redis -- Data --> API
API -- JSON Response --> Client
The service does not implement an internal Time-To-Live (TTL) or Least Recently Used (LRU) eviction policy. Instead, it relies on explicit upstream signals:
- Tombstone Events: Records are removed from the cache only when a "tombstone" event (a record with a null value) is received from the Kafka topic.
- Implication: The upstream producer is responsible for managing the lifecycle of data. If the upstream does not send tombstones, the cache will grow indefinitely.
The core data entity is the Customer.
| Field | Type | Description |
|---|---|---|
accountId |
String | Unique identifier (Key) |
name |
String | Customer Name |
address |
String | Customer Address |
phone |
String | Contact Phone |
createdAt |
i64 | Creation timestamp |
updatedAt |
i64 | Last update timestamp |
The system also supports these additional Avro message types for testing and development:
Represents a customer billing statement.
| Field | Type | Description |
|---|---|---|
accountId |
String | Associated account ID |
year |
i32 | Billing year |
totalAmount |
f64 | Total amount due |
payments |
Vec<Payment> | List of payments made |
| Field | Type | Description |
|---|---|---|
date |
String | Payment date (RFC3339) |
amount |
f64 | Payment amount |
method |
String | Payment method |
Tracks service usage by customers.
| Field | Type | Description |
|---|---|---|
accountId |
String | Associated account ID |
serviceType |
String | Type of service used |
amount |
f64 | Usage amount |
unit |
String | Unit of measurement (e.g., "GB") |
timestamp |
i64 | Usage timestamp (milliseconds) |
Represents a customer support ticket.
| Field | Type | Description |
|---|---|---|
ticketId |
String | Unique ticket identifier |
accountId |
String | Associated account ID |
issue |
String | Issue description |
status |
String | Current ticket status |
timestamp |
i64 | Creation timestamp (milliseconds) |
Retrieves a customer by their Account ID.
- URL:
/api/users/{account_id} - Method:
GET - Response:
200 OK(JSON) or404 Not Found - Headers:
Cache-Control: public, max-age={config.seconds}ETag: "{updatedAt}"
Manually adds a new customer to the cache.
- URL:
/api/users - Method:
POST - Body: JSON object matching the
Customermodel. - Response:
201 Created: Returns the created customer.409 Conflict: If the user already exists.
Manually removes a customer from the cache.
- URL:
/api/users/{account_id} - Method:
DELETE - Response:
200 OK: Returns the deleted customer.404 Not Found: If the user does not exist.
Lists all customer keys (account IDs) in the cache. Supports pagination and filtering.
- URL:
/api/users - Method:
GET - Query Parameters:
limit(optional): Number of keys to return (default: all).offset(optional): Number of keys to skip (default: 0).filter(optional): Filter keys by substring.
- Response:
200 OKwith a JSON array of strings (keys).
Configuration is handled via figment and can be supplied via a YAML file or environment variables (APP_).
| Section | Key | Default | Description |
|---|---|---|---|
| webservice | address |
0.0.0.0:8080 |
Bind address for the API |
prefix |
/api |
API path prefix | |
| kafka | brokers |
Required | Kafka bootstrap servers |
group_id |
Required | Consumer group ID | |
topic |
Required | Topic name to consume | |
schema_registry_url |
Required | URL for Schema Registry | |
cache_max_age_seconds |
300 |
HTTP Cache-Control max-age | |
| cache | stores |
Required | List of store definitions (in_memory, redis) |
routes |
Required | List of route mappings to stores |
Example config.yaml:
webservice:
address: "0.0.0.0:8080"
prefix: "/api"
kafka:
brokers: "localhost:9092"
group_id: "push-cache-group"
topic: "users"
schema_registry_url: "http://localhost:8081"
cache_max_age_seconds: 60
cache:
stores:
- name: "mem"
type: "in_memory"
schemas: [] # Optional: filter specific schemas if needed
# - name: "main_redis"
# type: "redis"
# url: "redis://localhost:6379"
# prefix: "cache"
routes:
- path: "/customers"
store: "mem"
The application uses a flexible caching layer that allows you to define multiple cache stores and map specific request paths to these stores using routes.
A store defines a backend where data is cached. Supported types are in_memory and redis.
Each store configuration must have a unique name and a type (in_memory or redis).
in_memory: A fast, local memory cache.schemas(optional): A list of Avro schema names to restrict what data is stored in this cache. If omitted or empty, it stores all schemas.
redis: A Redis-backed cache for distributed setups.url: The Redis connection string (e.g.,redis://localhost:6379). Supports usernames and passwords.prefix(optional): A string prefix to prepend to all keys stored in Redis by this application, which is useful when sharing a Redis instance among multiple services.
Routes determine which store handles the incoming requests based on the API URL path.
path: The access path configured for this route (e.g.,/customers).store: Thenameof the store defined instoreswhere queries for this path should be routed.
Example: Mixed Storage Configuration
cache:
stores:
- name: "fast_local"
type: "in_memory"
schemas: ["customer", "bill"]
- name: "global_redis"
type: "redis"
url: "redis://user:password@redis.internal.net:6379"
prefix: "prod_v1"
routes:
- path: "/local-cache"
store: "fast_local"
- path: "/global-cache"
store: "global_redis"The repository includes a populate_kafka example tool for generating test data and publishing it to Kafka with proper Avro encoding.
- Supports multiple message types:
customer,bill,usage,ticket - Dynamic schema registration with Schema Registry
- Configurable topic and record count
- Statistics reporting (min/max/avg message sizes)
- Proper Avro encoding with magic byte and schema ID
Basic usage:
./scripts/populate-local.shGenerate different message types:
# Generate customer records (default)
MESSAGE_TYPE=customer COUNT=100 ./scripts/populate-local.sh
# Load test customer endpoint
BASE_URL=http://localhost:8080/cache/customers ./scripts/load-test.sh
# Generate billing records
MESSAGE_TYPE=bill COUNT=50 ./scripts/populate-local.sh
# Load test bill endpoint
BASE_URL=http://localhost:8080/cache/bills ./scripts/load-test.sh
# Generate usage records
MESSAGE_TYPE=usage COUNT=200 ./scripts/populate-local.sh
# Load test usage endpoint
BASE_URL=http://localhost:8080/cache/usage ./scripts/load-test.sh
# Generate support tickets
MESSAGE_TYPE=ticket COUNT=25 ./scripts/populate-local.sh
# Load test ticket endpoint
BASE_URL=http://localhost:8080/cache/tickets ./scripts/load-test.shOverride topic:
TOPIC=custom-topic COUNT=100 ./scripts/populate-local.sh| Argument | Short | Default | Description |
|---|---|---|---|
--config |
-c |
Required | Path to configuration YAML file |
--secrets |
-s |
secrets |
Directory containing secret files |
--message-type |
-m |
customer |
Message type: customer, bill, usage, or ticket |
--topic |
-t |
From config | Override Kafka topic name |
--count |
-n |
100 |
Number of records to generate |
For convenience, use the Makefile targets:
# Generate different types of test data
make populate-customers
make populate-bills
make populate-usage
make populate-tickets
# See all available populate targets
make populate-help- Rust (latest stable)
- Kafka & Zookeeper (local or remote)
- Schema Registry
make- Docker & Docker Compose (optional, for running dependencies)
You can use Docker Compose to run the necessary infrastructure dependencies (Redis, Kafka, Zookeeper, Schema Registry). The Makefile provides convenient shortcuts:
# Start Redis
make compose-redis
# Start Kafka with Zookeeper (Standard)
make compose-kafka-zk
# Start Kafka in KRaft mode (Experimental/Newer)
make compose-kafka-kraft-
Start dependencies (in separate terminals or background):
make start-zookeeper make start-kafka make start-schema
-
Run the backend:
make rust-container-dev
Run unit tests and doctests:
make rust-container-test
# OR directly:
cd rust-container && cargo testThe project uses cargo-chef for optimized Docker layer caching.
make rust-container-docker- Metrics: Prometheus metrics are processed via
axum-prometheus. - Health Checks: Integrated via
libhams. - Logging: Structured logging via
tracingandtracing-subscriber. Log level controlled viaCAPTURE_LOG(default: WARN).
libhams: Custom library for service health and management. (Linking handled automatically inbuild.rs).
Single query performance of DashState (DashMap) with growing state sizes.
| State Size | Time (ns) | Trend |
|---|---|---|
| 100 | ~40.5 | Baseline |
| 1,000 | ~37.9 | Fast |
| 10,000 | ~56.1 | +48% |
| 100,000 | ~128.9 | +130% |
| 1,000,000 | ~267.1 | +107% |
| 5,000,000 | ~322.0 | +20% |
| 16,000,000 | ~350-400 | Projected |
To run benchmarks:
cargo bench --bench cache_benchmarkThroughput with 1,000,000 entries and varying concurrency (Tokio tasks).
| Concurrency | Throughput (QPS) |
|---|---|
| 1 | ~52 K |
| 10 | ~290 K |
| 50 | ~450 K |
| 100 | ~1.2 M |
| 500 | ~1.5 M |
| 1000 | ~1.9 M |
To run:
cargo bench --bench concurrent_benchmarkThroughput with 100 concurrent tasks and varying state sizes.
| State Size | Throughput (QPS) |
|---|---|
| 100,000 | ~1.16 M |
| 1,000,000 | ~1.55 M |
| 5,000,000 | (Incomplete) |
| Benchmark | Result | Notes |
|---|---|---|
| Single Insert Latency | ~167 ns | Overwrite existing key |
| Concurrent Insert Throughput | ~1.5 M QPS | 100 concurrent tasks |
To run insertions:
cargo bench --bench cache_benchmark -- insert_performance
cargo bench --bench concurrent_benchmark -- concurrent_insert_performance