AI agent–based customer support resolution: agents consume and produce Kafka events to triage, classify, and resolve tickets. Triage consumes ticket.created, classifies by type, and routes to type-specific topics; Billing, Technical, and Feature specialists consume from their topics and produce ticket.resolved.
Architecture Overview
The diagram above illustrates the Support Resolution System deployed on an AWS EKS Cluster. The core components are:
- Support Tickets: Customer tickets are created and sent to Kafka (
ticket.events). - Kafka (CFK): Central message bus; agents consume and produce to the following topics:
ticket.events: Input for new support tickets.ticket.triaged.billing,ticket.triaged.technical,ticket.triaged.feature_request,ticket.triaged.other,ticket.triaged.human: Triage agent routes tickets here by classification.ticket.resolved: Specialists publish resolved tickets here.
- Triage Agent: Consumes new tickets, enriches them using Amazon DynamoDB, classifies (via Ollama LLM), and produces to the appropriate triaged topic.
- Specialist Agents (Billing, Tech, Feature): Each consumes their routed tickets, processes them, and publishes resolution.
- Observability: Prometheus, Trace IDs, and structured logging are used across all agents for monitoring and tracing.
Note: All topic names and message flows follow the naming and routing conventions in
shared/topics.py.
- shared/ – Reusable libraries:
shared/topics.py(topic mapping),shared/specialist_base.py,shared/aws/dynamodb.py. - agents/ – triage (consumes
ticket.events, produces toticket.triaged.*), billing, technical, feature (consume type-specific topics, produceticket.resolved). Each has Dockerfile and k8s manifests. - events/ – JSON Schema for Kafka events. See events/README.md.
- infra/ – Terraform for DynamoDB, Prometheus stack, Pod Identity. See infra/README.md.
- scripts/ –
create-kafka-topics.sh,e2e-triage.sh,e2e-specialists.sh. - docs/observability.md – Trace IDs, Prometheus metrics.
- AWS CLI configured with credentials
- kubectl and Docker
- Terraform 1.x
- Access to deploy to an AWS account
Prerequisites: terraform, aws CLI, kubectl, docker on PATH. AWS credentials configured.
cd support-resolution-system
# Full provision: Kafka platform (from sibling repo) + infra + topics + agents
python scripts/provision.py --kafka-platform-path ../terraform-aws-confluent-platform --auto-approve
# Or if Kafka is already deployed:
python scripts/provision.py --cluster-name confluent-dev-eks --region us-east-1 --auto-approveUse --mock-llm for E2E/CI (no LLM API calls). Use --skip-* to omit steps. Run python scripts/provision.py --help for options.
Follow these steps in order. All commands assume you are in the indicated directory.
From the terraform-aws-confluent-platform repo (a separate repo):
cd terraform-aws-confluent-platform/envs/dev
cp terraform.tfvars.example terraform.tfvars # if present
# Edit terraform.tfvars: set region, name (e.g. confluent-dev), cluster_version
terraform init
terraform applyThis creates VPC, EKS cluster, Confluent operator. Note the cluster name (e.g. confluent-dev-eks).
Still in terraform-aws-confluent-platform:
# From repo root
cd ../..
aws eks update-kubeconfig --name confluent-dev-eks --region us-east-1 # use your cluster name and region
kubectl apply -k manifests/overlays/dev
kubectl wait --for=jsonpath='{.status.readyReplicas}'=3 statefulset/zookeeper -n confluent --timeout=300s
kubectl wait --for=jsonpath='{.status.readyReplicas}'=3 statefulset/kafka -n confluent --timeout=300sFrom terraform-aws-confluent-platform repo root:
ZONE_ID=$(terraform -chdir=envs/dev output -raw kafka_dns_zone_id)
ZONE_ID=$ZONE_ID ./scripts/create-kafka-dns.shThis creates CNAMEs for kafka.confluent.local and b0/b1/b2.confluent.local in the VPC private zone. If Kafka services don't have EXTERNAL-IP yet, wait and retry.
From support-resolution-system:
cd support-resolution-system/infra
cp terraform.tfvars.example terraform.tfvars
# Edit terraform.tfvars:
# region = "us-east-1"
# cluster_name = "confluent-dev-eks" # must match Kafka platform
# kafka_dns_domain = "confluent.local"
terraform init
terraform applyThis creates DynamoDB table (optional enrichment), Prometheus stack, Pod Identity for triage. Use cluster_name from Step 1.
From support-resolution-system repo root (kubectl context must be set to the EKS cluster):
cd ..
./scripts/create-kafka-topics.shCreates ticket.events, ticket.triaged.billing, ticket.triaged.technical, ticket.triaged.feature_request, ticket.triaged.account, ticket.triaged.other, ticket.triaged.human, ticket.resolved.
From support-resolution-system repo root (build context must include shared/):
export AWS_REGION=us-east-1
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
ECR_REGISTRY="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com"
ECR_URI="${ECR_REGISTRY}/triage-agent:latest"
aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_REGISTRY
aws ecr create-repository --repository-name triage-agent --region $AWS_REGION 2>/dev/null || true
docker build -f agents/triage/Dockerfile -t triage-agent:latest .
docker tag triage-agent:latest $ECR_URI
docker push $ECR_URIWindows (PowerShell): Use $env:AWS_REGION, $env:AWS_ACCOUNT_ID, etc., and replace variable expansion with your values if needed.
Edit agents/triage/k8s/deployment.yaml and set spec.template.spec.containers[0].image to your ECR URI (e.g. 123456789012.dkr.ecr.us-east-1.amazonaws.com/triage-agent:latest).
Or on Linux/macOS:
sed -i.bak "s|REPLACE_ME|$ECR_URI|g" agents/triage/k8s/deployment.yaml
# Or, if the file has a different placeholder:
sed -i.bak "s|940534692014.dkr.ecr.us-east-1.amazonaws.com/triage-agent:latest|$ECR_URI|g" agents/triage/k8s/deployment.yamlFrom support-resolution-system repo root:
kubectl apply -f agents/triage/k8s/namespace.yaml
kubectl apply -f agents/triage/k8s/serviceaccount.yaml
kubectl apply -f agents/triage/k8s/configmap.yaml
# In-cluster Ollama (CPU-only, qwen2.5:0.5b; no API key needed)
kubectl apply -f agents/triage/k8s/ollama.yaml
# Secret (use "not-used" for Ollama-only; replace with real key for Anthropic/OpenAI)
kubectl create secret generic triage-agent-keys -n support-agents \
--from-literal=ANTHROPIC_API_KEY=not-used \
--dry-run=client -o yaml | kubectl apply -f -
kubectl apply -f agents/triage/k8s/deployment.yamlFor deterministic E2E (no LLM calls): Edit agents/triage/k8s/configmap.yaml, set MOCK_LLM: "true", then:
kubectl apply -f agents/triage/k8s/configmap.yaml
kubectl rollout restart deployment/triage-agent -n support-agentskubectl get pods -n support-agents
kubectl logs -n support-agents -l app=triage-agent -fIf logs show model 'qwen2.5:0.5b' not found, pull the model:
kubectl exec -n support-agents deploy/ollama -- ollama pull qwen2.5:0.5bFrom support-resolution-system repo root:
./scripts/e2e-triage.shExpected: PASS: Found ticket.triaged for ticket_id=... (with MOCK_LLM, triage classifies as billing, so output goes to ticket.triaged.billing).
For the full flow (triage → specialist → ticket.resolved), deploy the billing agent (and optionally technical, feature).
From support-resolution-system repo root:
ECR_URI="${ECR_REGISTRY}/billing-agent:latest" # re-use ECR_REGISTRY from Step 6
aws ecr create-repository --repository-name billing-agent --region $AWS_REGION 2>/dev/null || true
docker build -f agents/billing/Dockerfile -t billing-agent:latest .
docker tag billing-agent:latest $ECR_URI
docker push $ECR_URIEdit agents/billing/k8s/deployment.yaml: set image to your ECR URI (e.g. 123456789012.dkr.ecr.us-east-1.amazonaws.com/billing-agent:latest).
For E2E: Edit agents/billing/k8s/configmap.yaml, set MOCK_LLM: "true".
kubectl apply -f agents/billing/k8s/configmap.yaml
kubectl create secret generic billing-agent-keys -n support-agents \
--from-literal=ANTHROPIC_API_KEY=not-used \
--dry-run=client -o yaml | kubectl apply -f -
kubectl apply -f agents/billing/k8s/deployment.yaml
kubectl get pods -n support-agents -l app=billing-agent
kubectl logs -n support-agents -l app=billing-agent -fRepeat the same pattern for technical and feature. If starting a new shell, run export AWS_REGION=us-east-1 and export ECR_REGISTRY=$(aws sts get-caller-identity --query Account --output text).dkr.ecr.us-east-1.amazonaws.com first.
# Technical
aws ecr create-repository --repository-name technical-agent --region $AWS_REGION 2>/dev/null || true
docker build -f agents/technical/Dockerfile -t technical-agent:latest .
docker tag technical-agent:latest ${ECR_REGISTRY}/technical-agent:latest
docker push ${ECR_REGISTRY}/technical-agent:latest
# Edit agents/technical/k8s/deployment.yaml and configmap.yaml
kubectl apply -f agents/technical/k8s/configmap.yaml
kubectl create secret generic technical-agent-keys -n support-agents --from-literal=ANTHROPIC_API_KEY=not-used --dry-run=client -o yaml | kubectl apply -f -
kubectl apply -f agents/technical/k8s/deployment.yaml
# Feature
aws ecr create-repository --repository-name feature-agent --region $AWS_REGION 2>/dev/null || true
docker build -f agents/feature/Dockerfile -t feature-agent:latest .
docker tag feature-agent:latest ${ECR_REGISTRY}/feature-agent:latest
docker push ${ECR_REGISTRY}/feature-agent:latest
# Edit agents/feature/k8s/deployment.yaml and configmap.yaml
kubectl apply -f agents/feature/k8s/configmap.yaml
kubectl create secret generic feature-agent-keys -n support-agents --from-literal=ANTHROPIC_API_KEY=not-used --dry-run=client -o yaml | kubectl apply -f -
kubectl apply -f agents/feature/k8s/deployment.yamlFrom support-resolution-system repo root, with triage and billing both running and MOCK_LLM=true for both:
./scripts/e2e-specialists.shExpected: PASS: Found ticket.resolved for ticket_id=... with resolved_by=billing.
Requires Kafka and agents running (e.g. in k8s). From a machine that can reach Kafka (or from inside the cluster):
pip install -r tests/requirements.txt
KAFKA_BOOTSTRAP_SERVERS=kafka.confluent.local:9092 pytest tests/e2e/test_full_flow.py -v -sIntegration tests verify components working together. Run unit tests (no external deps):
pip install -r tests/requirements.txt
pytest tests/unit/ -vRun all tests (integration tests skip when services are unavailable):
pytest tests/ -v| Test | What it verifies | Requirements |
|---|---|---|
| Event schema | ticket.triaged matches events/ticket.triaged.schema.json |
None |
| Agent + Kafka | Agent consumes and produces real Kafka messages | KAFKA_BOOTSTRAP_SERVERS set, Kafka reachable |
| Agent + DynamoDB | Real DynamoDB call works with AWS credentials | DYNAMODB_TABLE + AWS creds, optional DYNAMODB_TEST_CUSTOMER_ID |
| Agent + Ollama | AI returns usable response in expected format | Ollama running, MOCK_LLM unset |
To enrich tickets with customer data, the triage agent needs DYNAMODB_TABLE and Pod Identity. Infra Terraform creates the table and role. After terraform apply in infra:
terraform -chdir=infra output -raw dynamodb_table_nameSet that value in agents/triage/k8s/configmap.yaml as DYNAMODB_TABLE: "support-customers" (or the output value), then apply and restart triage.
Seed the DynamoDB table with test customers for integration tests and E2E:
DYNAMODB_TABLE=$(terraform -chdir=infra output -raw dynamodb_table_name)
python scripts/seed-dynamodb.py --table "$DYNAMODB_TABLE"Or use a custom JSON file: python scripts/seed-dynamodb.py --table support-customers --file customers.json
For integration tests, set AUTO_SEED_DYNAMODB=1 to auto-seed before DynamoDB tests run.
The infra Terraform deploys kube-prometheus-stack to the monitoring namespace. Agent pods have prometheus.io/scrape: "true" annotations and are scraped automatically. See docs/observability.md for Grafana access.
| Aspect | Behavior |
|---|---|
| Routing logic | LLM + confidence threshold (CONFIDENCE_THRESHOLD, default 0.7). Below threshold → ticket.triaged.human. |
| Unknown types | No silent drop. LLM returns unknown type → routes to ticket.triaged.human (fallback queue). |
| Model | Default: Ollama qwen2.5:0.5b. For production: LLM_PROVIDER=anthropic with Claude API, or larger Ollama model (e.g. qwen2.5:3b). |
| Human oversight | Low-confidence or unknown classifications → human queue (ticket.triaged.human). |
| Response guardrails | Policy checks before emitting ticket.resolved: max length, forbidden phrases, PII patterns. Violations block produce. |
| Accuracy eval | pytest tests/eval -v -s (requires real LLM, MOCK_LLM unset). Uses tests/eval/fixtures/triage_cases.json. |
- e2e-triage.sh fails: "No ticket.triaged" – Check triage logs:
kubectl logs -n support-agents -l app=triage-agent --tail=150. Ensure topics exist (./scripts/create-kafka-topics.sh). With MOCK_LLM, triage returns billing; e2e consumes fromticket.triaged.billing. - Ollama "model not found" – Run
kubectl exec -n support-agents deploy/ollama -- ollama pull qwen2.5:0.5b. - Kafka connection refused / UnknownHostException – Ensure Step 3 (create-kafka-dns.sh) ran and Kafka services have EXTERNAL-IP. Agents must run in the same VPC (e.g. same EKS cluster).
- Consumer offset past e2e message – Scale triage to 0, reset offsets:
kafka-consumer-groups --bootstrap-server kafka.confluent.local:9092 --group triage-agent --topic ticket.events --reset-offsets --to-earliest --execute(from a Kafka tools pod), then scale back to 1.
- Triage agent – Consumes
ticket.events, produces toticket.triaged.* - Billing agent – Consumes
ticket.triaged.billing, producesticket.resolved - Technical agent – Consumes
ticket.triaged.technical, producesticket.resolved - Feature agent – Consumes
ticket.triaged.feature_request, producesticket.resolved
