Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: subscriptions not closing after done #1793

Merged
merged 14 commits into from
Feb 26, 2024
5 changes: 5 additions & 0 deletions .changeset/eighty-pumpkins-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fuel-ts/account": patch
---

Fixed subscriptions hanging when not closed by user even after connection is closed
117 changes: 59 additions & 58 deletions packages/account/src/providers/fuel-graphql-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,75 +7,76 @@ type FuelGraphQLSubscriberOptions = {
query: DocumentNode;
variables?: Record<string, unknown>;
fetchFn: typeof fetch;
abortController?: AbortController;
};

class FuelSubscriptionStream implements TransformStream {
readable: ReadableStream<FuelError | Record<string, unknown>>;
writable: WritableStream<Uint8Array>;
private readableStreamController!: ReadableStreamController<FuelError | Record<string, unknown>>;
export class FuelGraphqlSubscriber implements AsyncIterator<unknown> {
private stream!: ReadableStreamDefaultReader<Uint8Array>;
private static textDecoder = new TextDecoder();

constructor() {
this.readable = new ReadableStream({
start: (controller) => {
this.readableStreamController = controller;
},
});
public constructor(private options: FuelGraphQLSubscriberOptions) {}

this.writable = new WritableStream<Uint8Array>({
write: (bytes) => {
const text = FuelSubscriptionStream.textDecoder.decode(bytes);
// the fuel node sends keep-alive messages that should be ignored
if (text.startsWith('data:')) {
const { data, errors } = JSON.parse(text.split('data:')[1]);
if (Array.isArray(errors)) {
this.readableStreamController.enqueue(
new FuelError(
FuelError.CODES.INVALID_REQUEST,
errors.map((err) => err.message).join('\n\n')
)
);
} else {
this.readableStreamController.enqueue(data);
}
}
private async setStream() {
const { url, query, variables, fetchFn } = this.options;
danielbate marked this conversation as resolved.
Show resolved Hide resolved

const response = await fetchFn(`${url}-sub`, {
method: 'POST',
body: JSON.stringify({
query: print(query),
variables,
}),
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
},
});

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.stream = response.body!.getReader();
}
}

export async function* fuelGraphQLSubscriber({
url,
variables,
query,
fetchFn,
}: FuelGraphQLSubscriberOptions) {
const response = await fetchFn(`${url}-sub`, {
method: 'POST',
body: JSON.stringify({
query: print(query),
variables,
}),
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
},
});
async next(): Promise<IteratorResult<unknown, unknown>> {
arboleya marked this conversation as resolved.
Show resolved Hide resolved
if (!this.stream) {
await this.setStream();
}

// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await this.stream.read();
if (done) {
return { value, done };
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const subscriptionStreamReader = response
.body!.pipeThrough(new FuelSubscriptionStream())
.getReader();
const text = FuelGraphqlSubscriber.textDecoder.decode(value);

while (true) {
const { value, done } = await subscriptionStreamReader.read();
if (value instanceof FuelError) {
throw value;
}
yield value;
if (done) {
break;
// We don't care about responses that don't start with 'data:' like keep-alive messages
if (!text.startsWith('data:')) {
// eslint-disable-next-line no-continue
continue;
}

const { data, errors } = JSON.parse(text.split('data:')[1]);

if (Array.isArray(errors)) {
throw new FuelError(
FuelError.CODES.INVALID_REQUEST,
errors.map((err) => err.message).join('\n\n')
danielbate marked this conversation as resolved.
Show resolved Hide resolved
);
}

return { value: data, done: false };
}
}

/**
* Gets called when `break` is called in a `for-await-of` loop.
*/
async return(): Promise<IteratorResult<unknown, undefined>> {
await this.stream.cancel();
this.stream.releaseLock();
return { done: true, value: undefined };
}

[Symbol.asyncIterator](): AsyncIterator<unknown, unknown, undefined> {
return this;
}
danielbate marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 2 additions & 2 deletions packages/account/src/providers/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import type {
import type { Coin } from './coin';
import type { CoinQuantity, CoinQuantityLike } from './coin-quantity';
import { coinQuantityfy } from './coin-quantity';
import { fuelGraphQLSubscriber } from './fuel-graphql-subscriber';
import { FuelGraphqlSubscriber } from './fuel-graphql-subscriber';
import { MemoryCache } from './memory-cache';
import type { Message, MessageCoin, MessageProof, MessageStatus } from './message';
import type { ExcludeResourcesOption, Resource } from './resource';
Expand Down Expand Up @@ -449,7 +449,7 @@ export default class Provider {
const isSubscription = opDefinition?.operation === 'subscription';

if (isSubscription) {
return fuelGraphQLSubscriber({
return new FuelGraphqlSubscriber({
url: this.url,
query,
fetchFn: (url, requestInit) =>
Expand Down
30 changes: 30 additions & 0 deletions packages/fuel-gauge/src/edge-cases.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { generateTestWallet } from '@fuel-ts/account/test-utils';
import { BaseAssetId, FUEL_NETWORK_URL, Provider, TransactionResponse, Wallet } from 'fuels';

import { getSetupContract } from './utils';

/**
Expand All @@ -9,4 +12,31 @@ describe('Edge Cases', () => {

expect((await contract.functions.new().call()).value.toNumber()).toEqual(12345);
});

test("SSE subscriptions that are closed by the node don't hang a for-await-of loop", async () => {
const provider = await Provider.create(FUEL_NETWORK_URL);
const adminWallet = await generateTestWallet(provider, [[500_000]]);

const destination = Wallet.generate({
provider,
});

const { id: transactionId } = await adminWallet.transfer(
destination.address,
100,
BaseAssetId,
{ gasPrice: provider.getGasConfig().minGasPrice, gasLimit: 10_000 }
);

const response = new TransactionResponse(transactionId, provider);

await response.waitForResult();

const subsciption = provider.operations.statusChange({ transactionId });

// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const iterator of subsciption) {
// we leave this intentionally empty so that we test that the subscription will end the loop when the connection is closed
}
});
});
91 changes: 89 additions & 2 deletions packages/fuel-gauge/src/transaction-response.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,75 @@ import {
randomBytes,
WalletUnlocked,
} from 'fuels';
import type { MockInstance } from 'vitest';

async function verifyKeepAliveMessageWasSent(subscriptionStream: ReadableStream<Uint8Array>) {
const decoder = new TextDecoder();
const reader = subscriptionStream.getReader();
let hasKeepAliveMessage = false;
do {
const { value, done } = await reader.read();
if (done) {
break;
}
const text = decoder.decode(value);
if (text === ':keep-alive-text\n\n') {
hasKeepAliveMessage = true;
}
} while (!hasKeepAliveMessage);

// The keep-alive message is sent every 15 seconds,
// and this assertion verifies that it was indeed sent.
// if this fails, check if the duration was changed on the fuel-core side.
// As of the time of writing, the latest permalink where this info can be found is:
// https://github.com/FuelLabs/fuel-core/blob/bf1b22f47c58a9d078676c5756c942d839f38916/crates/fuel-core/src/graphql_api/service.rs#L247
// To get the actual latest info you need to check out the master branch:
// https://github.com/FuelLabs/fuel-core/blob/master/crates/fuel-core/src/graphql_api/service.rs#L247
// This link can fail because master can change.
arboleya marked this conversation as resolved.
Show resolved Hide resolved
expect(hasKeepAliveMessage).toBe(true);
}

function getSubscriptionStreamFromFetch(streamHolder: { stream: ReadableStream<Uint8Array> }) {
function getFetchMock(
fetchSpy: MockInstance<
[input: RequestInfo | URL, init?: RequestInit | undefined],
Promise<Response>
>
) {
return async (...args: Parameters<typeof fetch>) => {
/**
* We need to restore the original fetch implementation so that fetching is possible
* We then get the response and mock the fetch implementation again
* So that the mock can be used for the next fetch call
*/
fetchSpy.mockRestore();
const r = await fetch(...args);
fetchSpy.mockImplementation(getFetchMock(fetchSpy));

const isSubscriptionCall = args[0].toString().endsWith('graphql-sub');
if (!isSubscriptionCall) {
return r;
}

/**
* Tee duplicates a stream and all writes happen to both streams.
nedsalk marked this conversation as resolved.
Show resolved Hide resolved
* We can thus use one stream to verify the keep-alive message was sent
* and pass the other forward in place of the original stream,
* thereby not affecting the response at all.
* */
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const [stream1, stream2] = r.body!.tee();
// eslint-disable-next-line no-param-reassign
streamHolder.stream = stream1;
return new Response(stream2);
};
}

const fetchSpy = vi.spyOn(global, 'fetch');

fetchSpy.mockImplementation(getFetchMock(fetchSpy));
arboleya marked this conversation as resolved.
Show resolved Hide resolved
return streamHolder;
}

/**
* @group node
Expand Down Expand Up @@ -87,7 +156,17 @@ describe('TransactionSummary', () => {

it('should ensure waitForResult always waits for the transaction to be processed', async () => {
const { cleanup, ip, port } = await launchNode({
args: ['--poa-interval-period', '750ms', '--poa-instant', 'false'],
/**
* This is set to so long in order to test keep-alive message handling as well.
* Keep-alive messages are sent every 15s.
* It is very important to test this because the keep-alive messages are not sent in the same format as the other messages
* and it is reasonable to expect subscriptions lasting more than 15 seconds.
* We need a proper integration test for this
* because if the keep-alive message changed in any way between fuel-core versions and we missed it,
* all our subscriptions would break.
* We need at least one long test to ensure that the keep-alive messages are handled correctly.
* */
args: ['--poa-instant', 'false', '--poa-interval-period', '17sec'],
});
const nodeProvider = await Provider.create(`http://${ip}:${port}/graphql`);

Expand All @@ -108,11 +187,19 @@ describe('TransactionSummary', () => {

expect(response.gqlTransaction?.status?.type).toBe('SubmittedStatus');

const subscriptionStreamHolder = {
stream: new ReadableStream<Uint8Array>(),
};

getSubscriptionStreamFromFetch(subscriptionStreamHolder);

await response.waitForResult();

expect(response.gqlTransaction?.status?.type).toEqual('SuccessStatus');
expect(response.gqlTransaction?.id).toBe(transactionId);

await verifyKeepAliveMessageWasSent(subscriptionStreamHolder.stream);

cleanup();
});
}, 18500);
});