Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,12 @@
const ratePerSecond = decodeI128(body['rate_per_second']);
const depositedAmount = decodeI128(body['deposited_amount']);
const startTime = Number(decodeU64(body['start_time']));

// Compute expected end time (assuming no pauses yet)
const durationSeconds = Number(BigInt(depositedAmount) / BigInt(ratePerSecond));
const endTime = startTime + durationSeconds;

const ratePerSecondBigInt = BigInt(ratePerSecond);
const endTime =
ratePerSecondBigInt === 0n
? null
: startTime + Number(BigInt(depositedAmount) / ratePerSecondBigInt);

await prisma.$transaction(async (tx: any) => {
await tx.user.upsert({
Expand Down Expand Up @@ -355,7 +357,7 @@
},
});

const existingEvent = await tx.streamEvent.findUnique({

Check failure on line 360 in backend/src/workers/soroban-event-worker.ts

View workflow job for this annotation

GitHub Actions / Backend npm test

src/__tests__/integration/streams.test.ts > Stream Lifecycle Integration Tests > Indexer processes stream_created event -> stream appears in GET /v1/streams/{id}

TypeError: tx.streamEvent.findUnique is not a function ❯ src/workers/soroban-event-worker.ts:360:50
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } },
select: { id: true },
});
Expand Down Expand Up @@ -412,8 +414,13 @@
select: { ratePerSecond: true, startTime: true, totalPausedDuration: true }
});

const durationSeconds = Number(BigInt(newDepositedAmount) / BigInt(stream.ratePerSecond));
const newEndTime = stream.startTime + durationSeconds + stream.totalPausedDuration;
const ratePerSecondBigInt = BigInt(stream.ratePerSecond);
const newEndTime =
ratePerSecondBigInt === 0n
? null
: stream.startTime +
Number(BigInt(newDepositedAmount) / ratePerSecondBigInt) +
stream.totalPausedDuration;

await tx.stream.update({
where: { streamId },
Expand Down Expand Up @@ -548,7 +555,7 @@
},
});

const existingEvent = await tx.streamEvent.findUnique({

Check failure on line 558 in backend/src/workers/soroban-event-worker.ts

View workflow job for this annotation

GitHub Actions / Backend npm test

src/__tests__/integration/streams.test.ts > Stream Lifecycle Integration Tests > Indexer processes stream_cancelled -> stream isActive = false

TypeError: tx.streamEvent.findUnique is not a function ❯ src/workers/soroban-event-worker.ts:558:50
where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } },
select: { id: true },
});
Expand Down
64 changes: 64 additions & 0 deletions backend/tests/soroban-event-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ vi.mock('../src/services/sse.service.js', () => ({
sseService: {
broadcastToStream: vi.fn(),
broadcast: vi.fn(),
broadcastToAdmin: vi.fn(),
},
}));

Expand Down Expand Up @@ -158,6 +159,69 @@ describe('SorobanEventWorker', () => {
);
});

it('should persist a zero-rate stream_created event without throwing', async () => {
const txHash = 'zero-rate-tx-hash';
const streamId = 77;

const mockEvent: rpc.Api.EventResponse = {
id: 'zero-rate-event-1',
type: 'contract',
ledger: 2000,
ledgerClosedAt: '2024-06-01T00:00:00Z',
txHash,
transactionIndex: 0,
operationIndex: 0,
inSuccessfulContractCall: true,
topic: [
{ switch: () => ({ value: 0 }), sym: () => 'stream_created' } as any,
{ switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any,
],
value: {
switch: () => ({ value: 4 }),
map: () => [
{ key: () => ({ sym: () => 'sender' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) },
{ key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) },
{ key: () => ({ sym: () => 'token_address' }), val: () => ({ address: () => ({ switch: () => ({ value: 1 }), contractId: () => Buffer.alloc(32) }) }) },
// rate_per_second = 0 (hi=0, lo=0)
{ key: () => ({ sym: () => 'rate_per_second' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '0' }) }) }) },
{ key: () => ({ sym: () => 'deposited_amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '500' }) }) }) },
{ key: () => ({ sym: () => 'start_time' }), val: () => ({ u64: () => ({ toString: () => '1700000000' }) }) },
] as any,
} as any,
};

let capturedStreamUpsert: any = null;
const mockTx = {
user: { upsert: vi.fn().mockResolvedValue({}) },
stream: {
upsert: vi.fn().mockImplementation((args) => {
capturedStreamUpsert = args;
return Promise.resolve({ streamId, isActive: true });
}),
},
streamEvent: {
findUnique: vi.fn().mockResolvedValue(null),
upsert: vi.fn().mockResolvedValue({ id: 'event-zero-rate' }),
},
};

(prisma.$transaction as ReturnType<typeof vi.fn>).mockImplementation((cb) => cb(mockTx));

// Must not throw
await expect(
(worker as any).handleStreamCreated(mockEvent, mockEvent.topic![1])
).resolves.not.toThrow();

// Stream was persisted
expect(mockTx.stream.upsert).toHaveBeenCalledTimes(1);

// endTime must be null — never computed via division
expect(capturedStreamUpsert?.create?.endTime).toBeNull();

// StreamEvent row was also persisted
expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1);
});

it('should handle duplicate fee collection events', async () => {
const eventId = 'test-fee-event';
const txHash = 'test-fee-tx-hash';
Expand Down
Loading