From a1a95a782a27d728f56ade833630fd19ea61319b Mon Sep 17 00:00:00 2001 From: dannyy2000 Date: Fri, 29 May 2026 00:22:02 +0100 Subject: [PATCH] fix(indexer): guard against division-by-zero on zero-rate streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Zero-rate streams (rate_per_second == 0) are a valid on-chain state when the deposited amount is smaller than the stream duration in seconds — the contract rounds the rate down to zero. The previous code computed endTime via BigInt division and threw RangeError on such events, causing the handler to reject and the stream + event to go un-indexed. Guard both handleStreamCreated and handleStreamToppedUp: when ratePerSecond is 0n, skip the division entirely and store endTime as null. Add a regression test that feeds a stream_created event with rate_per_second=0 and asserts the stream and streamEvent rows are persisted without throwing. Also fix the pre-existing broadcastToAdmin mock gap in the SSE service stub. Closes #450 #451 #452 #453 --- backend/src/workers/soroban-event-worker.ts | 19 ++++-- backend/tests/soroban-event-worker.test.ts | 64 +++++++++++++++++++++ 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index 436d82c..c9bf2a1 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -312,10 +312,12 @@ export class SorobanEventWorker { const ratePerSecond = decodeI128(body['rate_per_second']); const depositedAmount = decodeI128(body['deposited_amount']); const startTime = Number(decodeU64(body['start_time'])); - - // Compute expected end time (assuming no pauses yet) - const durationSeconds = Number(BigInt(depositedAmount) / BigInt(ratePerSecond)); - const endTime = startTime + durationSeconds; + + const ratePerSecondBigInt = BigInt(ratePerSecond); + const endTime = + ratePerSecondBigInt === 0n + ? null + : startTime + Number(BigInt(depositedAmount) / ratePerSecondBigInt); await prisma.$transaction(async (tx: any) => { await tx.user.upsert({ @@ -412,8 +414,13 @@ export class SorobanEventWorker { select: { ratePerSecond: true, startTime: true, totalPausedDuration: true } }); - const durationSeconds = Number(BigInt(newDepositedAmount) / BigInt(stream.ratePerSecond)); - const newEndTime = stream.startTime + durationSeconds + stream.totalPausedDuration; + const ratePerSecondBigInt = BigInt(stream.ratePerSecond); + const newEndTime = + ratePerSecondBigInt === 0n + ? null + : stream.startTime + + Number(BigInt(newDepositedAmount) / ratePerSecondBigInt) + + stream.totalPausedDuration; await tx.stream.update({ where: { streamId }, diff --git a/backend/tests/soroban-event-worker.test.ts b/backend/tests/soroban-event-worker.test.ts index ebd69aa..60c3901 100644 --- a/backend/tests/soroban-event-worker.test.ts +++ b/backend/tests/soroban-event-worker.test.ts @@ -48,6 +48,7 @@ vi.mock('../src/services/sse.service.js', () => ({ sseService: { broadcastToStream: vi.fn(), broadcast: vi.fn(), + broadcastToAdmin: vi.fn(), }, })); @@ -158,6 +159,69 @@ describe('SorobanEventWorker', () => { ); }); + it('should persist a zero-rate stream_created event without throwing', async () => { + const txHash = 'zero-rate-tx-hash'; + const streamId = 77; + + const mockEvent: rpc.Api.EventResponse = { + id: 'zero-rate-event-1', + type: 'contract', + ledger: 2000, + ledgerClosedAt: '2024-06-01T00:00:00Z', + txHash, + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'stream_created' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'sender' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'token_address' }), val: () => ({ address: () => ({ switch: () => ({ value: 1 }), contractId: () => Buffer.alloc(32) }) }) }, + // rate_per_second = 0 (hi=0, lo=0) + { key: () => ({ sym: () => 'rate_per_second' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '0' }) }) }) }, + { key: () => ({ sym: () => 'deposited_amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '500' }) }) }) }, + { key: () => ({ sym: () => 'start_time' }), val: () => ({ u64: () => ({ toString: () => '1700000000' }) }) }, + ] as any, + } as any, + }; + + let capturedStreamUpsert: any = null; + const mockTx = { + user: { upsert: vi.fn().mockResolvedValue({}) }, + stream: { + upsert: vi.fn().mockImplementation((args) => { + capturedStreamUpsert = args; + return Promise.resolve({ streamId, isActive: true }); + }), + }, + streamEvent: { + findUnique: vi.fn().mockResolvedValue(null), + upsert: vi.fn().mockResolvedValue({ id: 'event-zero-rate' }), + }, + }; + + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // Must not throw + await expect( + (worker as any).handleStreamCreated(mockEvent, mockEvent.topic![1]) + ).resolves.not.toThrow(); + + // Stream was persisted + expect(mockTx.stream.upsert).toHaveBeenCalledTimes(1); + + // endTime must be null — never computed via division + expect(capturedStreamUpsert?.create?.endTime).toBeNull(); + + // StreamEvent row was also persisted + expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1); + }); + it('should handle duplicate fee collection events', async () => { const eventId = 'test-fee-event'; const txHash = 'test-fee-tx-hash';