Skip to content

coded-streams/crypto-risk-engine

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

1 Commit
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Crypto Risk Engine

Version Flink Kafka Avro Elasticsearch

A real-time risk control engine for cryptocurrency trading platforms that detects suspicious trading patterns, monitors compliance, and prevents financial losses using Apache Flink CEP (Complex Event Processing).

πŸ“‹ Overview

The Crypto Risk Engine is a sophisticated real-time monitoring system that analyzes cryptocurrency trading activity across multiple exchanges. It combines real-time risk scoring for every trade with complex pattern detection to identify market manipulation, compliance violations, and abnormal trading behavior.

🎯 Key Features

  • Real-time Risk Analysis: Comprehensive risk scoring for every trade within milliseconds
  • Complex Pattern Detection: CEP-based detection of wash trading, spoofing, volume spikes, and flash crashes
  • Multi-exchange Monitoring: Unified risk monitoring across Binance, Coinbase, Kraken, and other exchanges
  • Compliance Enforcement: Automated regulatory compliance checks and reporting
  • Scalable Architecture: Built on Apache Flink for high-throughput, low-latency processing
  • Schema Evolution: Avro-based data contracts for type safety and backward compatibility
  • RocksDB State Backend: High-performance state management with Minio checkpoint storage
  • Elasticsearch Integration: Real-time analytics and dashboarding capabilities

πŸ—οΈ System Architecture

graph TB
    subgraph DataGeneration [Data Generation Layer]
        A[Spring Boot Trade Producer] --> B[Kafka Cluster]
        A --> C[H2 Database]
    end
    
    subgraph Processing [Real-time Processing Layer]
        B --> D[Flink CEP Risk Engine]
        D --> E[Risk Analyzed Trades]
        D --> F[Risk Alerts]
        
        subgraph FlinkInfra [Flink Infrastructure]
            G[RocksDB State Backend] --> D
            H[Minio Checkpoint Storage] --> D
        end
    end
    
    subgraph Storage [Storage & Analytics]
        E --> I[Risk Analyzed Trades Topic]
        F --> J[Risk Alerts Topic]
        I --> K[Elasticsearch Sink Connector]
        K --> L[Elasticsearch Cluster]
        J --> M[Spring Boot Reporting Service]
        L --> M
        M --> N[Web Dashboard]
        M --> O[Mobile Platform]
        C --> P[Trade Audit Trail]
    end
    
    subgraph Infrastructure [Infrastructure]
        Q[Schema Registry] --> B
        Q --> D
        R[Monitoring] --> D
    end
    
    style A fill:#e1f5fe
    style D fill:#f3e5f5
    style E fill:#e8f5e8
    style F fill:#ffebee
    style G fill:#fff3e0
    style H fill:#fff3e0
    style L fill:#fce4ec
    style M fill:#e8f5e8
Loading

πŸ“Š Data Flow

sequenceDiagram
    participant TP as Trade Producer
    participant K as Kafka
    participant FE as Flink Engine
    participant CEP as CEP Patterns
    participant RA as Risk Analyzer
    participant SINK as Kafka Sinks
    participant ES as Elasticsearch
    participant RS as Reporting Service
    participant UI as Web/Mobile UI
    
    TP->>K: CryptoTrade (Avro)
    K->>FE: Stream Trades
    FE->>RA: Real-time Risk Analysis
    RA->>SINK: RiskAnalyzedTrade
    FE->>CEP: Pattern Detection
    CEP->>SINK: RiskAlert
    SINK->>K: RiskAnalyzedTrade Topic
    SINK->>K: RiskAlert Topic
    K->>ES: Elasticsearch Sink Connector
    ES->>RS: Risk Data Query
    K->>RS: Real-time Alerts
    RS->>UI: Dashboard Updates
Loading

🎭 Demonstration Setup

For demonstration purposes, the system includes a Spring Boot Trade Producer Service that simulates realistic trading activity:

  • Multiple Users: Generates trades from different user accounts with varying risk profiles
  • Multiple Exchanges: Simulates trading across Binance, Coinbase, Kraken, FTX, and KuCoin
  • Realistic Patterns: Includes normal trading, wash trading, spoofing, and volume spikes
  • Configurable Rates: Adjustable trade generation frequency for testing

Trade Producer API Endpoints

POST /api/trades/produce
Content-Type: application/json

{
  "userId": "USER_001",
  "customerId": "CUST_1001", 
  "symbol": "BTC/USDT",
  "side": "BUY",
  "quantity": 0.5,
  "price": 45000.0,
  "orderType": "LIMIT",
  "exchange": "BINANCE"
}
POST /api/admin/mock-trading/start

πŸ”§ Technology Stack

Core Components

  • Apache Flink 1.17.1: Stream processing engine with CEP
  • Apache Kafka 3.4.0: Message bus for event streaming
  • Apache Avro 1.11.4: Data serialization and schema registry
  • Spring Boot 3.1.5: Trade producer and reporting service
  • Elasticsearch 8.0+: Real-time analytics and search
  • RocksDB: High-performance state backend for Flink
  • Minio: S3-compatible checkpoint storage
  • H2 Database: In-memory storage for trade audit

Data Contracts

graph LR
    A[Avro Schemas] --> B[Code Generation]
    B --> C[Java Classes]
    C --> D[Trade Producer]
    C --> E[Risk Engine]
    C --> F[Schema Registry]
    
    style A fill:#fff3e0
    style C fill:#e8f5e8
Loading

πŸš€ Quick Start

Prerequisites

  • Java 11+
  • Apache Kafka 3.4+
  • Docker & Docker Compose
  • Maven 3.6+

1. Clone and Build

git clone https://github.com/codedstreams/crypto-risk-engine.git
cd crypto-risk-engine

# Build data contracts first
cd data-contracts
mvn clean install

# Build trade producer
cd ../trade-producer  
mvn clean package

# Build risk engine
cd ../risk-engine
mvn clean package

2. Start Infrastructure

docker-compose up -d

# Create Kafka topics
kafka-topics.sh --create --topic crypto-trades --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
kafka-topics.sh --create --topic risk-alerts --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
kafka-topics.sh --create --topic risk-analyzed-trades --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

3. Configure Elasticsearch Sink

{
  "name": "risk-analyzed-trades-es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "risk-analyzed-trades",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

4. Start Services

# Start Trade Producer (Terminal 1)
cd trade-producer
mvn spring-boot:run

# Start Risk Engine (Terminal 2)  
cd risk-engine
flink run target/risk-engine-1.0.0.jar \
  --kafka.bootstrap.servers localhost:9092 \
  --schema.registry.url http://localhost:8081 \
  --minio.endpoint http://localhost:9000 \
  --state.backend.rocksdb true

# Start Reporting Service (Terminal 3)
cd reporting-service
mvn spring-boot:run

5. Generate Test Data

# Start mock trading
curl -X POST http://localhost:8080/api/admin/mock-trading/start

# Check producer stats
curl http://localhost:8080/api/admin/stats

# Access dashboard
open http://localhost:8082/dashboard

🎯 Risk Detection Patterns

1. Volume Spike Detection

graph LR
    A[Trade 1: $1K] --> B[Trade 2: $2.5K]
    B --> C[Trade 3: $6K]
    C --> D[🚨 VOLUME_SPIKE Alert]
    
    style D fill:#ff6b6b
Loading

Pattern: 3+ trades with increasing notional values (>150% growth) within 2 minutes

2. Wash Trading Detection

graph TB
    A[BUY 1000 ADA] --> B[SELL 950 ADA]
    B --> C[BUY 1050 ADA]
    C --> D[🚨 WASH_TRADING Alert]
    
    style D fill:#ff6b6b
Loading

Pattern: BUY β†’ SELL β†’ BUY with similar quantities within 1 minute

3. Large Order Detection

graph LR
    A[Single Trade] --> B{Check Thresholds}
    B -->|BTC > $500K| C[🚨 LARGE_ORDER]
    B -->|ETH > $250K| C
    B -->|SOL > $100K| C
    
    style C fill:#ff6b6b
Loading

4. Flash Crash Detection

graph LR
    A[SELL] --> B[SELL] 
    B --> C[SELL]
    C --> D[10+ SELLs in 5min]
    D --> E[🚨 FLASH_CRASH]
    
    style E fill:#ff6b6b
Loading

πŸ“ˆ Risk Analysis Framework

Every trade undergoes comprehensive real-time risk analysis:

graph TB
    A[Original Trade] --> B[Volume Analysis]
    A --> C[Behavioral Analysis] 
    A --> D[Market Impact Analysis]
    A --> E[Compliance Check]
    
    B --> F[Risk Score Calculation]
    C --> F
    D --> F
    E --> F
    
    F --> G[Risk Level Assignment]
    G --> H[RiskAnalyzedTrade]
    
    style H fill:#c8e6c9
Loading

Risk Score Components

  • Volume Analysis: Spike detection, percentile ranking, Z-scores
  • Behavioral Analysis: Wash trading, spoofing, front-running patterns
  • Market Impact: Liquidity consumption, price slippage, large orders
  • Compliance: Sanctions, AML, KYC, geographic restrictions

πŸ” Monitoring & Alerting

Real-time Dashboards

  • Trade Producer Dashboard: http://localhost:8080/h2-console
  • Flink Web UI: http://localhost:8081
  • Risk Reporting Dashboard: http://localhost:8082/dashboard
  • Elasticsearch Kibana: http://localhost:5601

Key Metrics

# Monitor risk alerts
kafka-console-consumer.sh --topic risk-alerts --bootstrap-server localhost:9092

# Monitor analyzed trades  
kafka-console-consumer.sh --topic risk-analyzed-trades --bootstrap-server localhost:9092

# Check Elasticsearch data
curl http://localhost:9200/risk-analyzed-trades/_search?pretty

# Check Flink job status
curl http://localhost:8081/jobs

βš™οΈ Configuration

Flink State Management

# RocksDB State Backend
state.backend: rocksdb
state.backend.rocksdb: true
state.checkpoints.dir: s3://flink-checkpoints/risk-engine
state.backend.rocksdb.localdir: /tmp/flink/rocksdb

# Minio Checkpoint Configuration
s3.endpoint: http://minio:9000
s3.path.style.access: true
s3.credentials.provider: basic
s3.access.key: minioadmin
s3.secret.key: minioadmin

Risk Engine Parameters

# Kafka Configuration
kafka.bootstrap.servers=localhost:9092
kafka.topic.trades=crypto-trades
kafka.topic.alerts=risk-alerts
kafka.topic.analyzed.trades=risk-analyzed-trades

# Pattern Detection
volume.spike.window.minutes=2
volume.spike.threshold=1.5
wash.trading.window.minutes=1
wash.trading.quantity.tolerance=0.1

# Risk Thresholds  
threshold.btc=500000.0
threshold.eth=250000.0
threshold.sol=100000.0

Trade Producer Configuration

producer:
  mock:
    enabled: true
    rate-ms: 500
    users: ["USER_001", "USER_002", "USER_003"]
    symbols: ["BTC/USDT", "ETH/USDT", "SOL/USDT"]
    exchanges: ["BINANCE", "COINBASE", "KRAKEN"]

πŸ—ƒοΈ Data Schemas

CryptoTrade Schema

{
  "tradeId": "unique-id",
  "userId": "USER_001", 
  "symbol": "BTC/USDT",
  "side": "BUY",
  "quantity": 0.5,
  "price": 45000.0,
  "notional": 22500.0,
  "timestamp": 1672531200000,
  "exchange": "BINANCE",
  "riskLevel": "LOW"
}

RiskAnalyzedTrade Schema

{
  "originalTrade": {...},
  "riskScore": 0.75,
  "finalRiskLevel": "HIGH", 
  "triggeredPatterns": ["VOLUME_SPIKE", "LARGE_ORDER"],
  "volumeAnalysis": {...},
  "behavioralAnalysis": {...},
  "complianceFlags": ["TAX_REPORTING_REQUIRED"],
  "riskMitigationActions": ["MANUAL_REVIEW_REQUIRED"]
}

🚨 Alert Examples

Volume Spike Alert

{
  "alertId": "alert-123",
  "patternType": "VOLUME_SPIKE", 
  "userId": "USER_001",
  "severity": "HIGH",
  "description": "User exhibited abnormal trading volume spike within short period",
  "recommendedAction": "Temporarily limit user's trading limits and require manual review"
}

Wash Trading Alert

{
  "alertId": "alert-456",
  "patternType": "WASH_TRADING",
  "userId": "USER_002", 
  "severity": "CRITICAL",
  "description": "Potential wash trading detected with rapid buy-sell cycles",
  "recommendedAction": "Freeze account for investigation and report to compliance"
}

πŸ”’ Production Deployment

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: risk-engine
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: risk-engine
        image: crypto-risk-engine:1.0.0
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka-cluster:9092"
        - name: SCHEMA_REGISTRY_URL  
          value: "http://schema-registry:8081"
        - name: STATE_BACKEND_ROCKSDB
          value: "true"
        - name: MINIO_ENDPOINT
          value: "http://minio:9000"

High Availability

  • Flink Checkpointing: Minio S3 storage for state recovery with RocksDB backend
  • Kafka Partitioning: Horizontal scaling across partitions
  • Schema Registry: Avro schema evolution and compatibility
  • Elasticsearch Cluster: Distributed search and analytics

πŸ§ͺ Testing

Unit Tests

# Run all tests
mvn test

# Run specific test suite
mvn test -Dtest=TradeRiskAnalyzerTest

Integration Testing

# Start test environment
docker-compose -f docker-compose.test.yml up

# Run integration tests
mvn verify -Pintegration-test

Performance Testing

# Generate high load
curl -X POST http://localhost:8080/api/admin/mock-trading/start?rateMs=100

# Monitor performance
curl http://localhost:8081/jobs/metrics

# Check Elasticsearch performance
curl http://localhost:9200/_cluster/health?pretty

🀝 Contributing

  1. Fork the repository
  2. Create feature branch: git checkout -b feature/new-pattern
  3. Commit changes: git commit -am 'Add new risk pattern'
  4. Push to branch: git push origin feature/new-pattern
  5. Submit pull request

Development Guidelines

  • Follow Avro schema evolution best practices
  • Include comprehensive unit tests
  • Update documentation for new features
  • Use descriptive commit messages

πŸ“š Additional Resources

πŸ†˜ Support

πŸ“„ License


πŸ‘¨β€πŸ’» Project Lead

Nestor A. A.
Senior Software & Data Streaming Engineer

  • πŸ“§ Email: nestorabiawuh@gmail.com
  • πŸ’Ό LinkedIn: Nestor Abiangang
  • 🏒 Role: Lead Developer - Crypto Risk Engine
  • πŸ”§ Specialties: Backend Engineering, Real-time Stream Processing, Risk Management Systems, Distributed Systems

Built with ❀️ by CodedStreams Engineering

Last Updated: October 2025

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages