Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions backend/src/routes/health.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ const router = Router();
* tags:
* - Health
* summary: Detailed health check
* description: Returns detailed health information about the API, database, and indexer.
* description: |
* Returns liveness and readiness information.
* **Liveness** (200 vs 503) is determined by DB reachability alone.
* **Indexer lag** is reported in the body for observability but only
* forces a 503 when the indexer is actually enabled
* (`STREAM_CONTRACT_ID` env var set) and its state row is stale
* (lag > 60 s). A cold-started instance with no state row yet, or a
* deployment with the indexer intentionally disabled, always returns 200
* as long as the DB is reachable.
* responses:
* 200:
* description: Service is healthy
Expand All @@ -25,9 +33,14 @@ const router = Router();
* db:
* type: string
* example: connected
* indexerEnabled:
* type: boolean
* description: Whether the event indexer is configured
* example: true
* indexerLag:
* type: integer
* description: Seconds since last indexer update
* nullable: true
* description: Seconds since last indexer update, or null when no state row exists yet
* example: 5
* uptime:
* type: number
Expand All @@ -44,6 +57,9 @@ router.get('/', async (_req: Request, res: Response) => {
dbStatus = 'disconnected';
}

// Whether the event-indexer is configured (STREAM_CONTRACT_ID must be set for it to run).
const indexerEnabled = !!process.env.STREAM_CONTRACT_ID;

let indexerLag = -1;
try {
const state = await prisma.indexerState.findUnique({ where: { id: 'singleton' } });
Expand All @@ -52,17 +68,22 @@ router.get('/', async (_req: Request, res: Response) => {
const updatedAt = Math.floor(state.updatedAt.getTime() / 1000);
indexerLag = Math.max(0, now - updatedAt);
}
// indexerLag === -1 means no state row yet (cold start) — not an error.
} catch {
// If indexer state query fails, we treat it as degraded
indexerLag = -1;
}

const isHealthy = dbStatus === 'connected' && (indexerLag >= 0 && indexerLag <= 60);
// 503 only when: DB is down, OR the indexer is enabled and its state row is
// stale (lag > 60). A missing state row (lag === -1) is a cold-start
// condition, not a failure, even when the indexer is enabled.
const indexerDegraded = indexerEnabled && indexerLag > 60;
const isHealthy = dbStatus === 'connected' && !indexerDegraded;
const status = isHealthy ? 'ok' : 'degraded';

res.status(isHealthy ? 200 : 503).json({
status,
db: dbStatus,
indexerEnabled,
indexerLag: indexerLag === -1 ? null : indexerLag,
uptime: process.uptime(),
});
Expand Down
97 changes: 97 additions & 0 deletions backend/tests/health.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import request from 'supertest';

// Prisma mock — replaced per test to simulate DB up/down and indexer state.
const prismaMock = {
$queryRaw: vi.fn(),
indexerState: {
findUnique: vi.fn(),
},
};

vi.mock('../src/lib/prisma.js', () => ({
prisma: prismaMock,
default: prismaMock,
}));

import app from '../src/app.js';

function makeState(lagSeconds: number) {
const updatedAt = new Date(Date.now() - lagSeconds * 1000);
return { id: 'singleton', updatedAt };
}

describe('GET /health', () => {
beforeEach(() => {
vi.unstubAllEnvs();
prismaMock.$queryRaw.mockResolvedValue([{ '?column?': 1n }]);
prismaMock.indexerState.findUnique.mockResolvedValue(null);
});

afterEach(() => {
vi.restoreAllMocks();
});

it('returns 200 when DB is up and indexer is disabled (no STREAM_CONTRACT_ID)', async () => {
vi.stubEnv('STREAM_CONTRACT_ID', '');

const res = await request(app).get('/health');
expect(res.status).toBe(200);
expect(res.body.status).toBe('ok');
expect(res.body.db).toBe('connected');
expect(res.body.indexerEnabled).toBe(false);
expect(res.body.indexerLag).toBeNull();
});

it('returns 200 when DB is up and indexer is enabled but has no state row yet (cold start)', async () => {
vi.stubEnv('STREAM_CONTRACT_ID', 'CSOME_CONTRACT_ADDRESS');
prismaMock.indexerState.findUnique.mockResolvedValue(null);

const res = await request(app).get('/health');
expect(res.status).toBe(200);
expect(res.body.status).toBe('ok');
expect(res.body.indexerEnabled).toBe(true);
expect(res.body.indexerLag).toBeNull();
});

it('returns 200 when DB is up, indexer enabled, and lag is within threshold', async () => {
vi.stubEnv('STREAM_CONTRACT_ID', 'CSOME_CONTRACT_ADDRESS');
prismaMock.indexerState.findUnique.mockResolvedValue(makeState(30));

const res = await request(app).get('/health');
expect(res.status).toBe(200);
expect(res.body.status).toBe('ok');
expect(res.body.indexerLag).toBeGreaterThanOrEqual(0);
expect(res.body.indexerLag).toBeLessThanOrEqual(60);
});

it('returns 503 when DB is up, indexer enabled, and lag exceeds 60 s', async () => {
vi.stubEnv('STREAM_CONTRACT_ID', 'CSOME_CONTRACT_ADDRESS');
prismaMock.indexerState.findUnique.mockResolvedValue(makeState(120));

const res = await request(app).get('/health');
expect(res.status).toBe(503);
expect(res.body.status).toBe('degraded');
expect(res.body.indexerLag).toBeGreaterThan(60);
});

it('returns 503 when DB is down regardless of indexer state', async () => {
vi.stubEnv('STREAM_CONTRACT_ID', '');
prismaMock.$queryRaw.mockRejectedValue(new Error('DB connection refused'));

const res = await request(app).get('/health');
expect(res.status).toBe(503);
expect(res.body.status).toBe('degraded');
expect(res.body.db).toBe('disconnected');
});

it('returns 200 with indexerLag in body for observability', async () => {
vi.stubEnv('STREAM_CONTRACT_ID', 'CSOME_CONTRACT_ADDRESS');
prismaMock.indexerState.findUnique.mockResolvedValue(makeState(10));

const res = await request(app).get('/health');
expect(res.status).toBe(200);
expect(typeof res.body.indexerLag).toBe('number');
expect(typeof res.body.uptime).toBe('number');
});
});
2 changes: 2 additions & 0 deletions contracts/stream_contract/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ pub enum StreamError {
InvalidDuration = 9,
/// Supplied token address is not a valid token contract.
InvalidTokenAddress = 10,
/// `amount / duration` rounds to zero — the stream would lock tokens but never accrue.
InvalidRate = 11,
}
10 changes: 10 additions & 0 deletions contracts/stream_contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ impl StreamContract {
let net_amount = Self::collect_fee(&env, &token_address, amount, stream_id);
let rate_per_second = net_amount / (duration as i128);

// Reject streams where integer division rounds the rate to zero.
// Such a stream would lock the sender's tokens in the contract while
// never accruing anything to the recipient — almost always a caller
// mistake (wrong decimals or an excessively long duration).
// Soroban rolls back the entire transaction on Err, so the token
// transfer above is unwound automatically.
if rate_per_second == 0 {
return Err(StreamError::InvalidRate);
}

save_stream(
&env,
stream_id,
Expand Down
32 changes: 24 additions & 8 deletions contracts/stream_contract/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,22 +1070,38 @@ fn test_create_stream_self_stream() {

#[test]
fn test_create_stream_zero_rate() {
// amount < duration → rate_per_second rounds to 0 via integer division.
// The stream is created but will never accrue anything.
// amount < duration → rate_per_second rounds to 0; must now be rejected.
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
mint(&env, &token, &sender, 1);

let client = create_contract(&env);
let id = client.create_stream(&sender, &Address::generate(&env), &token, &1, &1_000);
let s = client.get_stream(&id).unwrap();
assert_eq!(s.rate_per_second, 0);
let result = client.try_create_stream(
&sender,
&Address::generate(&env),
&token,
&1,
&1_000,
);
assert_eq!(result, Err(Ok(StreamError::InvalidRate)));
}

// Advance time — nothing should be claimable.
env.ledger().with_mut(|l| l.timestamp += 500);
assert_eq!(client.get_claimable_amount(&id), Some(0));
#[test]
fn test_create_stream_rate_exactly_one_succeeds() {
// amount == duration → rate = 1, which is the smallest valid rate.
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
mint(&env, &token, &sender, 100);

let client = create_contract(&env);
let id = client.create_stream(&sender, &Address::generate(&env), &token, &100, &100);
let s = client.get_stream(&id).unwrap();
assert_eq!(s.rate_per_second, 1);
assert!(s.is_active);
}

#[test]
Expand Down
104 changes: 92 additions & 12 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,8 @@

This document explains how FlowFi moves data from on-chain contract events into API responses and real-time frontend updates.

# FlowFi Architecture

This document explains how FlowFi moves data from on-chain contract events into API responses and real-time frontend updates.

## High-Level Overview

```mermaid
flowchart LR
Contract[Stream Contract (Soroban WASM)] --> Indexer[Soroban Event Indexer]
Indexer --> DB[(Postgres DB)]
DB --> API[Backend API (Express + SSE)]
API --> UI[Frontend (Next.js)]
UI --> API

```mermaid
flowchart LR
C["Soroban Stream Contract\nEvent Emission"] --> W["Event Worker / Indexer\nbackend/src/workers"]
Expand Down Expand Up @@ -155,3 +143,95 @@ Benefits:
1. `/v1/events/stats` exposes active SSE connections and connection-capacity metrics.
1. Admin metrics include SSE peak-per-IP visibility for abuse monitoring.
1. User summary endpoint (`/v1/users/{address}/summary`) is cached for 30s to protect DB hot paths.

---

## Event Indexing & Real-Time Updates

### Data-Flow Overview

```
Soroban RPC
│ poll for new contract events
SorobanEventWorker (backend/src/workers/soroban-event-worker.ts)
│ normalize payload, upsert Stream row, insert StreamEvent row
PostgreSQL (via Prisma)
│ StreamEvent table / Stream table updated
SSE broadcast (backend/src/services/sseService.ts)
│ pushes typed event to sse:stream:<id> and sse:user:<address> channels
Frontend useStreamEvents hook (frontend/src/hooks/useStreamEvents.ts)
│ receives event over long-lived SSE connection
Dashboard / NotificationDropdown re-render with live data
```

### Deduplication

`StreamEvent` rows carry a compound unique constraint:

```
@@unique([transactionHash, eventType])
```

This means replaying the same on-chain transaction (e.g. during a re-index or worker restart) will produce an `upsert` conflict rather than a duplicate row. The worker uses Prisma's `createOrUpdate` (upsert) path on `Stream` and a `createMany … skipDuplicates` path on `StreamEvent`.

### Indexer Cursor — `IndexerState`

The worker persists its progress in the `IndexerState` table (a single-row ledger-sequence cursor). On each poll cycle:

1. Read the stored `lastIndexedLedger` value.
2. Query the Soroban RPC for events emitted in `(lastIndexedLedger, latestLedger]`.
3. Process and persist events.
4. Update `IndexerState.lastIndexedLedger` to `latestLedger`.

On a cold start (no `IndexerState` row) the worker begins from a configured genesis ledger so historical streams are backfilled.

### Stale-Read Fallback

When the DB row for a stream was last updated more than a configurable threshold ago (`isStale` check in `backend/src/services/sorobanService.ts`), the API falls back to a live Soroban RPC call instead of serving the cached DB value. This keeps claimable-balance figures accurate even if the indexer lags.

---

## Action Signing Model

FlowFi actions split into two categories based on who holds the signing key:

| Action | Signer | How |
|---|---|---|
| **Top-up** | Server (custodial) | Backend submits the transaction using `KEEPER_SECRET_KEY`. The frontend sends only the stream ID and amount. |
| **Withdraw** | Wallet (non-custodial) | Frontend builds and signs the transaction via the connected wallet (Freighter). The backend currently only simulates server-side; the real transaction is signed and submitted by the frontend. |
| **Pause / Resume** | Wallet (non-custodial) | Same as withdraw — frontend-signed. The backend simulate endpoints exist for fee estimation but do not submit. |
| **Create stream** | Wallet (non-custodial) | Frontend signs via wallet and submits directly to the RPC. |

> **Important for contributors:** Do not wire pause/resume/withdraw to a server-side submit path. Only `top-up` is intentionally custodial. All other mutating actions must be wallet-signed by the user.

---

## Required Environment Variables

To run the full stack end-to-end, set the following secrets. See [`backend/.env.example`](../backend/.env.example) for the canonical list.

### Backend

| Variable | Purpose |
|---|---|
| `DATABASE_URL` | PostgreSQL connection string (Prisma) |
| `SOROBAN_RPC_URL` | Soroban RPC endpoint (e.g. Testnet: `https://soroban-testnet.stellar.org`) |
| `STREAMING_CONTRACT_ADDRESS` | Deployed FlowFi stream contract ID |
| `KEEPER_SECRET_KEY` | Server wallet secret key used to sign custodial top-up transactions |
| `JWT_SECRET` | Secret used to sign and verify auth JWTs |
| `REDIS_URL` | Redis connection string (only needed for multi-instance SSE fanout) |
| `STELLAR_NETWORK` | `TESTNET` or `MAINNET` |

### Frontend

| Variable | Purpose |
|---|---|
| `NEXT_PUBLIC_API_URL` | Base URL of the backend API (e.g. `http://localhost:3001/v1`) |
| `NEXT_PUBLIC_STREAMING_CONTRACT` | Contract address displayed in the Settings page |
| `NEXT_PUBLIC_STELLAR_NETWORK` | `TESTNET` or `MAINNET` — must match the backend value |
| `NEXT_PUBLIC_APP_VERSION` | Displayed in Settings; optional, defaults to `1.0.0` |
4 changes: 4 additions & 0 deletions frontend/src/app/settings/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ export default function SettingsPage() {

<button
onClick={handleBrowserPushToggle}
aria-label="Toggle browser notifications"
aria-pressed={browserPush}
className={`relative w-14 h-7 rounded-full transition-all duration-300 ${
browserPush
? "bg-gradient-to-r from-purple-500 to-blue-500"
Expand Down Expand Up @@ -326,6 +328,7 @@ export default function SettingsPage() {

<button
onClick={copyAddress}
aria-label={copied ? "Address copied" : "Copy wallet address"}
className="ml-3 opacity-70 hover:opacity-100 transition flex-shrink-0"
>
{copied ? (
Expand Down Expand Up @@ -386,6 +389,7 @@ export default function SettingsPage() {
navigator.clipboard.writeText(CONTRACT_ADDRESS);
toast.success("Contract address copied");
}}
aria-label="Copy contract address"
className="opacity-60 hover:opacity-100 transition"
>
<Copy size={14} />
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/ModeToggle.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export function ModeToggle() {
// Prevent hydration mismatch
if (!isMounted) {
return (
<button className="inline-flex items-center justify-center w-8 h-8">
<button className="inline-flex items-center justify-center w-8 h-8" aria-label="Toggle theme">
<span className="sr-only">Toggle theme</span>
</button>
);
Expand Down
Loading
Loading