Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/**
* Integration tests for the Soroban event worker with mocked Prisma/SSE.
*
* Exercises indexer → DB → GET API wiring without a real Postgres instance.
* Governance events (fee_config_updated, admin_transferred) and per-stream
* lifecycle handlers are covered here; full DB-backed flows live in
* stream-lifecycle.test.ts.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import request from 'supertest';
import { nativeToScVal, xdr, StrKey, Keypair } from '@stellar/stellar-sdk';
Expand Down Expand Up @@ -34,29 +42,27 @@ const { mockPrisma, mockSseService } = vi.hoisted(() => ({
user: {
upsert: vi.fn().mockResolvedValue({}),
},
$transaction: vi.fn(async (fn: any) => fn(mockPrisma)),
$transaction: vi.fn(async (fn: (client: typeof mockPrisma) => unknown) => fn(mockPrisma)),
$queryRaw: vi.fn().mockResolvedValue([{ '?column?': 1n }]),
$disconnect: vi.fn(),
}
},
}));

vi.mock('../../lib/prisma.js', () => ({
vi.mock('../../src/lib/prisma.js', () => ({
prisma: mockPrisma,
default: mockPrisma,
}));

vi.mock('../../services/sse.service.js', () => ({
vi.mock('../../src/services/sse.service.js', () => ({
sseService: mockSseService,
}));

// ─── App import (after mocks) ─────────────────────────────────────────────────

import app from '../../app.js';
import { sorobanEventWorker } from '../../workers/soroban-event-worker.js';

const describeIfDatabase = process.env.DATABASE_URL ? describe : describe.skip;
import app from '../../src/app.js';
import { sorobanEventWorker } from '../../src/workers/soroban-event-worker.js';

describeIfDatabase('Stream Lifecycle Integration Tests', () => {
describe('Indexer worker integration (mocked DB)', () => {
const senderPair = Keypair.random();
const recipientPair = Keypair.random();
const sender = senderPair.publicKey();
Expand Down Expand Up @@ -122,7 +128,6 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {

await sorobanEventWorker.processEvent(event);

// Verify stream appears in GET API
const res = await request(app).get(`/v1/streams/${streamId}`);
expect(res.status).toBe(200);
expect(res.body.streamId).toBe(streamId);
Expand Down Expand Up @@ -157,7 +162,7 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {
expect.objectContaining({
where: { streamId },
data: expect.objectContaining({ isPaused: true }),
})
}),
);
});

Expand Down Expand Up @@ -235,7 +240,7 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {
it('GET /v1/streams/{id}/events returns events', async () => {
mockPrisma.stream.findUnique.mockResolvedValue({ streamId });
mockPrisma.streamEvent.findMany.mockResolvedValue([
{ id: 'evt-1', eventType: 'CREATED', transactionHash: 'hash' }
{ id: 'evt-1', eventType: 'CREATED', transactionHash: 'hash' },
]);
mockPrisma.streamEvent.count.mockResolvedValue(1);

Expand All @@ -255,9 +260,7 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {
txHash: 'hash-fee-config',
ledger: 105,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('fee_config_updated'),
],
topic: [xdr.ScVal.scvSymbol('fee_config_updated')],
value: xdr.ScVal.scvMap([
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('admin'),
Expand Down Expand Up @@ -287,25 +290,30 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {
expect(mockPrisma.user.upsert).toHaveBeenCalledWith(
expect.objectContaining({
where: { publicKey: 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF' },
})
}),
);

expect(mockPrisma.stream.upsert).toHaveBeenCalledWith(
expect.objectContaining({
where: { streamId: 0 },
})
}),
);

expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith(
expect.objectContaining({
where: { transactionHash_eventType: { transactionHash: 'hash-fee-config', eventType: 'FEE_CONFIG_UPDATED' } },
where: {
transactionHash_eventType: {
transactionHash: 'hash-fee-config',
eventType: 'FEE_CONFIG_UPDATED',
},
},
create: expect.objectContaining({
streamId: 0,
eventType: 'FEE_CONFIG_UPDATED',
transactionHash: 'hash-fee-config',
ledgerSequence: 105,
}),
})
}),
);

expect(mockSseService.broadcastToAdmin).toHaveBeenCalledWith(
Expand All @@ -316,7 +324,7 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {
newTreasury,
oldFeeRateBps: 100,
newFeeRateBps: 200,
})
}),
);
});

Expand All @@ -329,9 +337,7 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {
txHash: 'hash-admin-transfer',
ledger: 106,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('admin_transferred'),
],
topic: [xdr.ScVal.scvSymbol('admin_transferred')],
value: xdr.ScVal.scvMap([
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('previous_admin'),
Expand All @@ -348,23 +354,27 @@ describeIfDatabase('Stream Lifecycle Integration Tests', () => {

expect(mockPrisma.streamEvent.upsert).toHaveBeenCalledWith(
expect.objectContaining({
where: { transactionHash_eventType: { transactionHash: 'hash-admin-transfer', eventType: 'ADMIN_TRANSFERRED' } },
where: {
transactionHash_eventType: {
transactionHash: 'hash-admin-transfer',
eventType: 'ADMIN_TRANSFERRED',
},
},
create: expect.objectContaining({
streamId: 0,
eventType: 'ADMIN_TRANSFERRED',
transactionHash: 'hash-admin-transfer',
ledgerSequence: 106,
}),
})
}),
);

expect(mockSseService.broadcastToAdmin).toHaveBeenCalledWith(
'stream.admin_transferred',
expect.objectContaining({
previousAdmin,
newAdmin,
})
}),
);
});
});

163 changes: 4 additions & 159 deletions backend/tests/integration/streams.test.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
/**
* Integration tests for the full stream lifecycle.
* Integration tests for stream HTTP routes with mocked Prisma/SSE.
*
* These tests mock the Prisma client and SSE service so they run in CI without
* a real Postgres or Redis instance. They verify that the indexer worker, stream
* controller, SSE broadcast, and RPC fallback all wire up correctly end-to-end.
* Indexer worker lifecycle flows are covered in indexer-worker.test.ts (mocked
* worker) and stream-lifecycle.test.ts (real Postgres). This file focuses on
* claimable RPC fallback, SSE broadcast contracts, and events pagination.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import request from 'supertest';

// Bypass Stellar signature verification on POST /v1/streams. The route is
// exercised here as a stand-in for the indexer worker, so we replace the auth
// middleware with a stub that injects a deterministic wallet.
vi.mock('../../src/middleware/auth.middleware.js', () => ({
authMiddleware: (req: any, _res: any, next: any) => {
req.user = { publicKey: 'GTEST_USER_PUBLIC_KEY' };
next();
},
optionalAuthMiddleware: (req: any, _res: any, next: any) => {
req.user = { publicKey: 'GTEST_USER_PUBLIC_KEY' };
next();
},
}));

// ─── Mocks (using vi.hoisted to ensure they are available to vi.mock) ─────────

const { mockSseService, mockPrisma } = vi.hoisted(() => ({
Expand Down Expand Up @@ -113,147 +99,6 @@ function makeStream(overrides: Partial<Record<string, unknown>> = {}) {

// ─── Test suites ──────────────────────────────────────────────────────────────

describe('Indexer → stream_created: stream appears in GET /v1/streams/:id', () => {
beforeEach(() => { vi.clearAllMocks(); });

it('creates a stream via POST and retrieves it via GET', async () => {
const stream = makeStream();
mockPrisma.stream.upsert.mockResolvedValue(stream);
mockPrisma.stream.findUnique.mockResolvedValue(stream);

// POST simulates what the indexer would do after processing stream_created
const createRes = await request(app)
.post('/v1/streams')
.send({
streamId: '1',
sender: SENDER,
recipient: RECIPIENT,
tokenAddress: TOKEN,
ratePerSecond: '10',
depositedAmount: '86400',
startTime: '1700000000',
});

expect(createRes.status).toBe(201);
expect(createRes.body.streamId).toBe(1);

// GET the same stream
const getRes = await request(app).get('/v1/streams/1');
expect(getRes.status).toBe(200);
expect(getRes.body.streamId).toBe(1);
expect(getRes.body.isActive).toBe(true);
});
});

describe('Indexer → stream_topped_up: depositedAmount updated', () => {
beforeEach(() => { vi.clearAllMocks(); });

it('GET reflects updated depositedAmount after top-up upsert', async () => {
const after = makeStream({ depositedAmount: '172800' });
mockPrisma.stream.upsert.mockResolvedValue(after);
mockPrisma.stream.findUnique.mockResolvedValue(after);

const createRes = await request(app)
.post('/v1/streams')
.send({
streamId: '1',
sender: SENDER,
recipient: RECIPIENT,
tokenAddress: TOKEN,
ratePerSecond: '10',
depositedAmount: '172800',
startTime: '1700000000',
});
expect(createRes.status).toBe(201);

const getRes = await request(app).get('/v1/streams/1');
expect(getRes.status).toBe(200);
expect(getRes.body.depositedAmount).toBe('172800');
});
});

describe('Indexer → stream_paused: isPaused = true, accrual stops', () => {
beforeEach(() => { vi.clearAllMocks(); });

it('GET returns isPaused=true after paused upsert', async () => {
const paused = makeStream({ isPaused: true, isActive: true });
mockPrisma.stream.upsert.mockResolvedValue(paused);
mockPrisma.stream.findUnique.mockResolvedValue(paused);

const createRes = await request(app)
.post('/v1/streams')
.send({
streamId: '1',
sender: SENDER,
recipient: RECIPIENT,
tokenAddress: TOKEN,
ratePerSecond: '10',
depositedAmount: '86400',
startTime: '1700000000',
});
expect(createRes.status).toBe(201);

const getRes = await request(app).get('/v1/streams/1');
expect(getRes.status).toBe(200);
// isPaused reflects the indexer's last write
expect(getRes.body.isPaused ?? false).toBe(true);
});
});

describe('Indexer → stream_resumed: isPaused = false, accrual resumes', () => {
beforeEach(() => { vi.clearAllMocks(); });

it('GET returns isPaused=false after resumed upsert', async () => {
const resumed = makeStream({ isPaused: false, isActive: true });
mockPrisma.stream.upsert.mockResolvedValue(resumed);
mockPrisma.stream.findUnique.mockResolvedValue(resumed);

const createRes = await request(app)
.post('/v1/streams')
.send({
streamId: '1',
sender: SENDER,
recipient: RECIPIENT,
tokenAddress: TOKEN,
ratePerSecond: '10',
depositedAmount: '86400',
startTime: '1700000000',
});
expect(createRes.status).toBe(201);

const getRes = await request(app).get('/v1/streams/1');
expect(getRes.status).toBe(200);
expect(getRes.body.isPaused ?? false).toBe(false);
});
});

describe('Indexer → stream_cancelled: isActive = false', () => {
beforeEach(() => { vi.clearAllMocks(); });

it('GET returns isActive=false after cancelled upsert', async () => {
const cancelled = makeStream({ isActive: false });
mockPrisma.stream.upsert.mockResolvedValue(cancelled);
mockPrisma.stream.findUnique.mockResolvedValue(cancelled);

const createRes = await request(app)
.post('/v1/streams')
.send({
streamId: '1',
sender: SENDER,
recipient: RECIPIENT,
tokenAddress: TOKEN,
ratePerSecond: '10',
depositedAmount: '86400',
startTime: '1700000000',
});
expect(createRes.status).toBe(201);

const getRes = await request(app).get('/v1/streams/1');
expect(getRes.status).toBe(200);
expect(getRes.body.isActive).toBe(false);
});
});

describe('Stale DB (>30s) → GET /v1/streams/:id/claimable falls back to RPC', () => {
beforeEach(() => { vi.clearAllMocks(); });

Expand Down
2 changes: 1 addition & 1 deletion backend/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export default defineConfig({
environment: 'node',
globals: true,
setupFiles: [],
include: ['tests/**/*.{test,spec}.ts', 'src/__tests__/**/*.{test,spec}.ts'],
include: ['tests/**/*.{test,spec}.ts'],
coverage: {
enabled: true,
provider: 'v8',
Expand Down
Loading