A production-pattern learning project demonstrating how to anchor Web2 data integrity proofs on an EVM blockchain using Java (Spring Boot), Web3j, Apache Kafka, and PostgreSQL — inspired by blockchain adoption in consortium O&G and energy supply chains.
- The Dual-State Design Pattern
- Why Kafka Protects Web2 API Performance from Web3 Block Times
- System Architecture
- Project Structure
- Prerequisites
- Quick Start
- Step-by-Step Setup
- API Reference
- Reconciliation Engine — How Tamper Detection Works
- Component Deep Dive
- Key Design Decisions
- Extending This Project
Enterprise data systems — especially in regulated industries like Oil & Gas, pharmaceuticals, and logistics — store hundreds of kilobytes of structured metadata per event (telemetry readings, custody transfers, compliance attestations). This data must be:
- Mutable — measurement corrections, operator updates, and reconciliation adjustments are legitimate.
- Auditable — at any point in time, you must be able to prove the data has not been tampered with offline.
- Fast to query — relational databases with indexing, joins, and full-text search.
- Cost-effective — storing 1 MB of O&G payload directly in Ethereum calldata would cost hundreds of dollars per record.
Blockchains are excellent at immutable commitment but terrible at cheap data storage. Relational databases are excellent at cheap, queryable storage but have no cryptographic accountability mechanism.
The Dual-State pattern splits responsibility cleanly:
┌─────────────────────────────────────────────────────────────────────────┐
│ WEB2 STATE (PostgreSQL) │
│ "The full truth" — all business data, mutable, queryable │
│ │
│ assetId │ cargoType │ originFacility │ ... │ dataHash │ txHash │
│ ───────────────────────────────────────────────────────────────────── │
│ PIPE-001 │ NGL │ Offshore-B12 │ ... │ 0x3a4b... │ 0xfa2c... │
└────────────────────────────────┬────────────────────────────────────────┘
│ SHA-256 of canonical JSON
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ WEB3 STATE (EVM / AssetTracker.sol) │
│ "The cryptographic commitment" — hash only, immutable, public │
│ │
│ mapping: assetId → { owner: 0x..., dataHash: bytes32, timestamp: uint }│
└─────────────────────────────────────────────────────────────────────────┘
The invariant: At any time, SHA-256(canonicalJson(dbRecord)) == onChainDataHash.
If this invariant fails during reconciliation, it means someone modified the PostgreSQL record without going through the authorised update path (which would require them to also update the on-chain hash using the owner's private key — computationally infeasible without the key).
| Data | Location | Rationale |
|---|---|---|
| Full shipment payload (1–100 KB) | PostgreSQL | Fast queries, cheap storage, supports updates |
| SHA-256 hash (32 bytes) | Ethereum / Anvil | Immutable commitment, ~21,000 gas per anchor |
| Transaction hash | PostgreSQL (reference) | Allows block explorer lookup |
| Ownership (Ethereum address) | Ethereum | Native to EVM security model |
| Network | Avg. Block Time | TX Confirmation Wait |
|---|---|---|
| Anvil (local, this project) | 2 seconds | ~2–4 seconds |
| Ethereum Mainnet | ~12 seconds | 12–60 seconds |
| Polygon | ~2 seconds | 2–10 seconds |
| Private Hyperledger Besu | 1–5 seconds | Variable |
If the Spring Boot REST endpoint waited for a mined transaction receipt before returning an HTTP response, the API would be unusable — client timeouts are typically 30 seconds, and under load, you would exhaust the Tomcat thread pool waiting for block confirmations.
HTTP Thread (< 50ms) Kafka Partition Web3Worker Thread
───────────────────── ────────────────── ────────────────────────────
POST /api/v1/shipments
│
├─ Persist to PostgreSQL ──────────────────────────────────────────────────>
│ (synchronous, fast)
│
├─ Publish to Kafka ──> [SHIP-001 event]
│ (async callback) ┌──< consume(SHIP-001)
│ │ sha256(canonicalJson)
└─ Return 202 Accepted │ anchorAsset(id, hash)
(HTTP response sent) │ await TransactionReceipt
│ (blocks here, not in HTTP)
└──> update DB: ANCHORED
Key architectural properties:
-
HTTP response time is O(DB write) + O(Kafka publish) — both < 10ms. Web3 latency is zero from the client's perspective.
-
Back-pressure is natural — if the EVM node is slow, the Kafka consumer pauses polling (with
max.poll.records=1). New ingestion events accumulate in the Kafka partition, not in memory or HTTP queues. The partition acts as a durable, ordered buffer. -
Fault isolation — the Ethereum node can go down entirely. Web2 ingestion continues. When the node recovers, Kafka consumers resume from their last committed offset and process the backlog.
-
Guaranteed ordering — assetId is the Kafka message key, ensuring all events for the same asset land on the same partition and are processed in arrival order.
-
Observability — the Kafka UI (
http://localhost:8090) shows message backlog, consumer lag, and partition assignment. This is your real-time Web3 processing queue visualiser.
POST /shipments
│
▼
[ PENDING ] ─── Kafka event published
│
┌─────────┴──────────┐
│ │
tx mined OK tx failed
│ │
▼ ▼
[ ANCHORED ] [ FAILED ]
│
reconciliation detects
hash mismatch
│
▼
[ HASH_MISMATCH ] ← ALERT: tamper detected
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Dual-State Asset Ledger │
│ │
│ ┌──────────────┐ POST ┌───────────────────────────────────────────────┐ │
│ │ API Client │ ──────────> │ ShipmentController (Spring MVC REST) │ │
│ │ (curl/ │ │ • Validates assetId uniqueness │ │
│ │ Postman) │ <────────── │ • Persists to PostgreSQL │ │
│ └──────────────┘ 202 Accept │ • Publishes Kafka event │ │
│ └────────────────────────────────────────────── ┘ │
│ │ publish │
│ ▼ │
│ ┌─────────────────────────┐ │
│ │ Apache Kafka │ │
│ │ Topic: shipment.ingested│ │
│ │ Key: assetId │ │
│ └─────────────┬───────────┘ │
│ │ consume │
│ ▼ │
│ ┌──────────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ PostgreSQL │ <──────>│ Web3Worker (Kafka @KafkaListener) │ │
│ │ ledger_db │ │ • Reloads full payload from DB │ │
│ │ shipment_ │ │ • Builds canonical JSON │ │
│ │ payloads table │ │ • SHA-256 hash computation │ │
│ └──────────────────┘ │ • RawTransactionManager (signed tx) │ │
│ ▲ │ • Calls AssetTracker.anchorAsset() │ │
│ │ update │ • Awaits TransactionReceipt │ │
│ │ ANCHORED │ • Updates DB: status, txHash │ │
│ └──────────────────└─────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────┐ ▼ │
│ │ ReconciliationEngine │ ┌─────────────────────────────────────────┐ │
│ │ (@Scheduled cron) │ │ Anvil (Local EVM Node) │ │
│ │ • Queries all ANCHORED │ │ • Chain ID: 31337 │ │
│ │ records from PostgreSQL │ │ • Block time: 2 seconds │ │
│ │ • Recomputes SHA-256 │ │ • Pre-funded accounts │ │
│ │ • eth_call getAsset() ───┼─>│ │ │
│ │ • 3-way hash comparison │ │ AssetTracker.sol deployed here │ │
│ │ • Marks HASH_MISMATCH │ │ • anchorAsset(string, bytes32) │ │
│ │ if tamper detected │ │ • getAsset(string) view │ │
│ └────────────────────────────┘ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘
dual-state-ledger/
├── docker-compose.yml # PostgreSQL + Zookeeper + Kafka + Anvil
├── .gitignore
├── README.md # This file
│
├── contracts/
│ ├── AssetTracker.sol # Solidity smart contract
│ └── README.md # Compile, deploy, and wrapper generation guide
│
└── backend/
├── pom.xml # Maven: Spring Boot + Web3j + Kafka + PostgreSQL
└── src/main/
├── resources/
│ └── application.properties # All runtime configuration
└── java/com/enterprise/ledger/
├── BackendApplication.java # @SpringBootApplication entry point
├── config/
│ ├── LedgerInfraConfig.java # Web3j + Credentials Spring Beans
│ └── KafkaConsumerConfig.java # Typed Kafka consumer factory
├── model/
│ └── ShipmentPayload.java # JPA entity (O&G metadata + chain state)
├── repository/
│ └── ShipmentRepository.java # Spring Data JPA queries
├── controller/
│ └── ShipmentController.java # REST API (POST, GET endpoints)
├── producer/
│ └── KafkaEventProducer.java # Async Kafka publisher
├── consumer/
│ └── Web3Worker.java # Kafka listener → SHA-256 → on-chain tx
├── scheduler/
│ └── ReconciliationEngine.java # Cron job: hash verification
└── contract/
└── AssetTracker.java # Web3j Java wrapper for AssetTracker.sol
| Tool | Version | Purpose |
|---|---|---|
| Java JDK | 17+ | Backend compilation and runtime |
| Maven | 3.9+ | Build tool |
| Docker Desktop | 24+ | Runs all infrastructure services |
Foundry (anvil, cast, forge) |
Latest | Compile and deploy Solidity; local EVM node |
| web3j CLI | Latest | Regenerate Java wrapper from ABI (optional) |
Install Foundry:
curl -L https://foundry.paradigm.xyz | bash && foundryupInstall web3j CLI:
curl -L https://get.web3j.io | sh && source ~/.web3j/source.sh# 1. Clone and enter the project
git clone <your-repo-url> && cd dual-state-ledger
# 2. Start all infrastructure (PostgreSQL, Kafka, Anvil)
docker compose up -d
# 3. Compile and deploy the smart contract
cd contracts
solc --abi --bin AssetTracker.sol -o ./build/ --overwrite
forge create --rpc-url http://localhost:8545 \
--private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 \
AssetTracker.sol:AssetTracker
# 4. Copy the deployed address into application.properties
# web3j.contract.address=0x<OUTPUT_FROM_ABOVE>
cd ..
# 5. Build and run the Spring Boot application
cd backend
mvn clean package -DskipTests
java -jar target/dual-state-ledger-1.0.0-SNAPSHOT.jar
# 6. Ingest a test shipment
curl -s -X POST http://localhost:8080/api/v1/shipments \
-H "Content-Type: application/json" \
-d '{
"assetId": "PIPE-SEG-2024-NGL-001",
"cargoType": "NaturalGasLiquid",
"cargoWeightKg": 48250.5,
"originFacility": "Offshore-Platform-Bravo",
"destinationFacility": "Onshore-Terminal-Delta",
"operatorId": "OP-CONSORTIUM-7",
"pipelineSegment": "NGL-MAIN-A",
"temperatureCelsius": -42.3,
"pressureBar": 85.7,
"geoLat": 57.9821,
"geoLon": 1.7241
}' | jq .
# 7. Poll until status = ANCHORED (~3-5 seconds for Anvil block time)
curl -s http://localhost:8080/api/v1/shipments/PIPE-SEG-2024-NGL-001 | jq '{anchorStatus, dataHash, onChainTxHash}'docker compose up -d
docker compose ps # All services should show "healthy"| Service | URL | Purpose |
|---|---|---|
| PostgreSQL | localhost:5432 |
Web2 relational store |
| Kafka | localhost:9092 |
Async event bus |
| Kafka UI | http://localhost:8090 |
Message browser and consumer lag monitor |
| Anvil (EVM) | http://localhost:8545 |
Local blockchain RPC endpoint |
See contracts/README.md for detailed instructions.
# Quick deploy with Foundry
cd contracts
forge create --rpc-url http://localhost:8545 \
--private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 \
AssetTracker.sol:AssetTrackerCopy the Deployed to: address from the output.
Edit backend/src/main/resources/application.properties:
web3j.contract.address=0x<YOUR_DEPLOYED_CONTRACT_ADDRESS>cd backend
mvn clean package -DskipTests
java -jar target/dual-state-ledger-1.0.0-SNAPSHOT.jarOr run directly via Maven for development:
mvn spring-boot:runPersists the payload to PostgreSQL and publishes a Kafka event. Returns immediately.
Request body:
{
"assetId": "PIPE-SEG-2024-NGL-001",
"cargoType": "NaturalGasLiquid",
"cargoWeightKg": 48250.5,
"originFacility": "Offshore-Platform-Bravo",
"destinationFacility": "Onshore-Terminal-Delta",
"operatorId": "OP-CONSORTIUM-7",
"pipelineSegment": "NGL-MAIN-A",
"temperatureCelsius": -42.3,
"pressureBar": 85.7,
"geoLat": 57.9821,
"geoLon": 1.7241
}Response: 202 Accepted
{
"id": 1,
"assetId": "PIPE-SEG-2024-NGL-001",
"anchorStatus": "PENDING",
"dataHash": null,
"onChainTxHash": null,
"ingestionTimestamp": "2024-01-15T10:30:00.000Z",
"..."
}Poll this to check when anchorStatus transitions to ANCHORED.
Response when anchored: 200 OK
{
"assetId": "PIPE-SEG-2024-NGL-001",
"anchorStatus": "ANCHORED",
"dataHash": "0x3a4b2c1f...",
"onChainTxHash": "0xfa2c8b...",
"..."
}Returns all records regardless of status.
Valid status values: PENDING, ANCHORED, FAILED, HASH_MISMATCH
# Find all records awaiting chain confirmation
curl http://localhost:8080/api/v1/shipments/status/PENDING
# Find tamper-detected records (investigation required)
curl http://localhost:8080/api/v1/shipments/status/HASH_MISMATCHIncludes PostgreSQL and Kafka connectivity checks.
The ReconciliationEngine runs on a cron schedule (default: every 5 minutes) and performs a
three-way integrity check on every ANCHORED record:
For each ANCHORED record in PostgreSQL:
Step 1: Rebuild canonical JSON from the live DB row
(same alphabetically-ordered field set used during original anchoring)
Step 2: recomputedHash = SHA-256(canonicalJson)
Step 3: onChainHash = eth_call AssetTracker.getAsset(assetId).dataHash
(read-only EVM call, no gas consumed)
Step 4: Verify two invariants:
(A) Arrays.equals(recomputedHash, onChainHash)
→ "The live DB row still matches the on-chain commitment"
(B) recomputedHash.equals(storedDataHash)
→ "The dataHash column itself was not silently modified"
PASS: Both invariants hold → log DEBUG "INTEGRITY OK"
FAIL: Any invariant broken → set status=HASH_MISMATCH, log WARN with full diff
The SHA-256 hash is computed over a deterministic canonical JSON string, not raw bytes or Jackson-serialised JSON (which can vary by library version or field order). The canonical form uses:
- Alphabetical key ordering — independent of Java class field declaration order.
- No trailing whitespace or newlines — a single compact JSON object.
- ISO-8601 UTC for timestamps —
Instant.toString()is always UTC. - Explicit null literals — null numeric fields appear as
null, not absent.
Example canonical string:
{"assetId":"PIPE-SEG-2024-NGL-001","cargoType":"NaturalGasLiquid","cargoWeightKg":48250.5,"destinationFacility":"Onshore-Terminal-Delta","geoLat":57.9821,"geoLon":1.7241,"ingestionTimestamp":"2024-01-15T10:30:00Z","operatorId":"OP-CONSORTIUM-7","originFacility":"Offshore-Platform-Bravo","pipelineSegment":"NGL-MAIN-A","pressureBar":85.7,"temperatureCelsius":-42.3}Critical: The canonicalization logic in Web3Worker.buildCanonicalJson() and
ReconciliationEngine.buildCanonicalJson() must be byte-for-byte identical. Any deviation
(even a trailing space) produces a different hash and a false-positive tamper alert.
-- Connect to PostgreSQL and manually alter a record
UPDATE shipment_payloads SET cargo_weight_kg = 99999.99 WHERE asset_id = 'PIPE-SEG-2024-NGL-001';On the next reconciliation cycle (or trigger it manually via a direct method call), the engine will:
- Recompute SHA-256 of the modified row → different hash.
- Compare against the immutable on-chain hash → mismatch.
- Set
anchorStatus = HASH_MISMATCHand emit aWARNlog.
Kafka Consumer Thread
│
├─ 1. Consume ShipmentPayload event (key = assetId)
├─ 2. Reload canonical record from PostgreSQL (source of truth)
├─ 3. Idempotency check: skip if already ANCHORED
├─ 4. buildCanonicalJson() → deterministic string
├─ 5. SHA-256(canonicalJson) → 32 bytes
├─ 6. Store hash to DB (PENDING state)
├─ 7. RawTransactionManager.sign(anchorAsset(assetId, bytes32))
├─ 8. eth_sendRawTransaction → Anvil mempool
├─ 9. Poll for TransactionReceipt (blocks this thread, not HTTP)
└─ 10. Update DB: status=ANCHORED, onChainTxHash=receipt.hash
Transaction Signing: RawTransactionManager signs transactions locally using the configured
private key via secp256k1 ECDSA. No external wallet, MetaMask, or remote signer is required.
The chain ID (31337 for Anvil) is included in the signed transaction to prevent replay attacks
on other networks (EIP-155).
The wrapper extends org.web3j.tx.Contract and encodes/decodes Solidity ABI types:
anchorAsset(String, byte[32])→ encodes to(Utf8String, Bytes32)→ ABI-encodes the function call →eth_sendRawTransactiongetAsset(String)→eth_call→ decodes(Address, Bytes32, Uint256)→Tuple3<String, byte[], BigInteger>
Both Web3Worker and ReconciliationEngine share the same Web3j and Credentials beans,
ensuring a single underlying HTTP connection pool to Anvil rather than opening new connections
per service instantiation.
| Decision | Choice | Rationale |
|---|---|---|
| Hash algorithm | SHA-256 | JVM standard, 32-byte output maps exactly to Solidity bytes32, widely auditable |
| On-chain payload | Hash only | Gas cost: ~21,000 per anchor vs millions for raw data; regulatory privacy |
| Kafka message key | assetId | Guarantees partition-level ordering for the same asset |
max.poll.records=1 |
1 record per poll | Prevents head-of-line blocking where one slow tx stalls an entire batch |
Chain ID in RawTransactionManager |
31337 | EIP-155 replay attack protection; must match Anvil's --chain-id |
ReadonlyTransactionManager in reconciliation |
eth_call only | Zero gas, no state mutation; reconciliation should never write to the chain |
BINARY = "" in wrapper |
Empty | BINARY is only needed for deploy(); load() only needs ABI encoding |
anchorStatus = PENDING before tx submission |
Persist hash first | Ensures hash is recorded in DB even if tx fails; avoids re-hash inconsistency on retry |
Replace spring.jpa.hibernate.ddl-auto=update with Flyway for production-safe schema evolution:
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>Create src/main/resources/db/migration/V1__create_shipment_payloads.sql.
In KafkaConsumerConfig.java, configure a DefaultErrorHandler with a DeadLetterPublishingRecoverer
to route FAILED events to a shipment.ingested.DLT topic for operator review.
Deploy one Spring Boot instance per consortium member. Each member uses their own private key
but calls the same shared AssetTracker.sol contract. Ownership transfers use transferAsset().
Update application.properties:
web3j.rpc.url=https://sepolia.infura.io/v3/<YOUR_PROJECT_ID>
web3j.chain.id=11155111Store web3j.private.key in an environment variable or Vault secret — never in a committed file.
# .github/workflows/generate-wrapper.yml
- name: Compile contract
run: solc --abi --bin contracts/AssetTracker.sol -o contracts/build/
- name: Generate wrapper
run: |
web3j generate solidity \
-a contracts/build/AssetTracker.abi \
-b contracts/build/AssetTracker.bin \
-o backend/src/main/java \
-p com.enterprise.ledger.contractMIT — free to use, fork, and extend for learning and enterprise prototyping.