Skip to content

Commit

Permalink
feat(url-loader): use fetch-event-source instead of sse-z (#2906)
Browse files Browse the repository at this point in the history
* feat(url-loader): use fetch-event-source instead of sse-z

* Do not use Node URL

* Cleanup

* Small fixes
  • Loading branch information
ardatan committed May 3, 2021
1 parent 20d2c7b commit 77d63ab
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 57 deletions.
5 changes: 5 additions & 0 deletions .changeset/wet-hornets-think.md
@@ -0,0 +1,5 @@
---
'@graphql-tools/url-loader': minor
---

feat(url-loader): use fetch-event-source instead of sse-z
4 changes: 1 addition & 3 deletions packages/loaders/url/package.json
Expand Up @@ -26,7 +26,6 @@
},
"devDependencies": {
"@types/extract-files": "8.1.0",
"@types/eventsource": "1.1.5",
"@types/ws": "7.4.2",
"graphql-upload": "11.0.0",
"mock-http": "1.1.0",
Expand All @@ -36,18 +35,17 @@
"@graphql-tools/delegate": "^7.0.1",
"@graphql-tools/utils": "^7.8.1",
"@graphql-tools/wrap": "^7.0.4",
"@microsoft/fetch-event-source": "2.0.1",
"@types/websocket": "1.0.2",
"abort-controller": "3.0.0",
"cross-fetch": "3.1.4",
"@ardatan/eventsource": "1.1.2",
"extract-files": "9.0.0",
"form-data": "4.0.0",
"graphql-ws": "^4.4.1",
"lodash": "4.17.21",
"meros": "1.1.4",
"is-promise": "4.0.0",
"isomorphic-ws": "4.0.1",
"sse-z": "0.3.0",
"subscriptions-transport-ws": "^0.9.18",
"sync-fetch": "0.3.0",
"tslib": "~2.2.0",
Expand Down
128 changes: 91 additions & 37 deletions packages/loaders/url/src/index.ts
Expand Up @@ -17,7 +17,7 @@ import {
withCancel,
} from '@graphql-tools/utils';
import { isWebUri } from 'valid-url';
import { fetch as crossFetch, Headers } from 'cross-fetch';
import { fetch as crossFetch } from 'cross-fetch';
import { SubschemaConfig } from '@graphql-tools/delegate';
import { introspectSchema, wrapSchema } from '@graphql-tools/wrap';
import { ClientOptions, createClient } from 'graphql-ws';
Expand All @@ -26,9 +26,7 @@ import syncFetch from 'sync-fetch';
import isPromise from 'is-promise';
import { extractFiles, isExtractableFile } from 'extract-files';
import FormData from 'form-data';
import '@ardatan/eventsource';
import { Subscription, SubscriptionOptions } from 'sse-z';
import { URL } from 'url';
import { fetchEventSource, FetchEventSourceInit } from '@microsoft/fetch-event-source';

This comment has been minimized.

Copy link
@MoritzGruber

MoritzGruber Jun 23, 2021

"@microsoft/fetch-event-source": "2.0.1" only works in the browser but not in a nodejs enviroment

for example
if you try to use graphql-tools in a nodejs enviroment this wont work since document is not defined inside @microsoft/fetch-event-source

@ardatan

import { ConnectionParamsOptions, SubscriptionClient as LegacySubscriptionClient } from 'subscriptions-transport-ws';
import AbortController from 'abort-controller';
import { meros } from 'meros';
Expand Down Expand Up @@ -118,7 +116,7 @@ export interface LoadFromUrlOptions extends SingleFileOptions, Partial<Introspec
/**
* Additional options to pass to the constructor of the underlying EventSource instance.
*/
eventSourceOptions?: SubscriptionOptions['eventSourceOptions'];
eventSourceOptions?: FetchEventSourceInit;
/**
* Handle URL as schema SDL
*/
Expand Down Expand Up @@ -202,6 +200,39 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
return form;
}

prepareGETUrl({
baseUrl,
query,
variables,
operationName,
}: {
baseUrl: string;
query: string;
variables: any;
operationName?: string;
}) {
const HTTP_URL = switchProtocols(baseUrl, {
wss: 'https',
ws: 'http',
});
const dummyHostname = 'https://dummyhostname.com';
const validUrl = HTTP_URL.startsWith('http')
? HTTP_URL
: HTTP_URL.startsWith('/')
? `${dummyHostname}${HTTP_URL}`
: `${dummyHostname}/${HTTP_URL}`;
const urlObj = new URL(validUrl);
urlObj.searchParams.set('query', query);
if (variables && Object.keys(variables).length > 0) {
urlObj.searchParams.set('variables', JSON.stringify(variables));
}
if (operationName) {
urlObj.searchParams.set('operationName', operationName);
}
const finalUrl = urlObj.toString().replace(dummyHostname, '');
return finalUrl;
}

buildExecutor(options: BuildExecutorOptions<SyncFetchFn>): SyncExecutor;
buildExecutor(options: BuildExecutorOptions<AsyncFetchFn>): AsyncExecutor;
buildExecutor({
Expand Down Expand Up @@ -240,14 +271,7 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
const query = print(document);
switch (method) {
case 'GET':
const dummyHostname = 'https://dummyhostname.com';
const validUrl = HTTP_URL.startsWith('http') ? HTTP_URL : `${dummyHostname}/${HTTP_URL}`;
const urlObj = new URL(validUrl);
urlObj.searchParams.set('query', query);
if (variables && Object.keys(variables).length > 0) {
urlObj.searchParams.set('variables', JSON.stringify(variables));
}
const finalUrl = urlObj.toString().replace(dummyHostname, '');
const finalUrl = this.prepareGETUrl({ baseUrl: pointer, query, variables });
fetchResult = fetch(finalUrl, {
method: 'GET',
credentials: 'include',
Expand Down Expand Up @@ -391,34 +415,58 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
};
}

buildSSESubscriber(pointer: string, eventSourceOptions?: SubscriptionOptions['eventSourceOptions']): Subscriber {
return async ({ document, variables }: { document: DocumentNode; variables: any }) => {
buildSSESubscriber(
pointer: string,
extraHeaders: Headers,
fetch: AsyncFetchFn,
options: FetchEventSourceInit
): Subscriber {
return async ({ document, variables, ...rest }: { document: DocumentNode; variables: any }) => {
const controller = new AbortController();
const query = print(document);
const finalUrl = this.prepareGETUrl({ baseUrl: pointer, query, variables });
const headers = this.getHeadersFromOptions(extraHeaders, {
document,
variables,
...rest,
});
return observableToAsyncIterable({
subscribe: observer => {
const subscription = new Subscription({
url: pointer,
searchParams: {
query,
variables: JSON.stringify(variables),
},
eventSourceOptions: {
// Ensure cookies are included with the request
withCredentials: true,
...eventSourceOptions,
},
onNext: data => {
const parsedData = JSON.parse(data);
observer.next(parsedData);
fetchEventSource(finalUrl, {
credentials: 'include',
headers,
method: 'GET',
onerror: error => {
observer.error(error);
},
onError: data => {
observer.error(data);
onmessage: event => {
observer.next(JSON.parse(event.data || '{}'));
},
onComplete: () => {
observer.complete();
onopen: async response => {
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith('text/event-stream')) {
let error;
try {
const { errors } = await response.json();
error = errors[0];
} catch (error) {
// Failed to parse body
}

if (error) {
throw error;
}

throw new Error(`Expected content-type to be ${'text/event-stream'} but got "${contentType}".`);
}
},
fetch,
signal: controller.signal,
...options,
});
return subscription;
return {
unsubscribe: () => controller.abort(),
};
},
});
};
Expand Down Expand Up @@ -450,7 +498,7 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
return customFetch as any;
}
}
return async ? crossFetch : syncFetch;
return async ? typeof fetch === 'undefined' ? crossFetch : fetch : syncFetch;
}

private getHeadersFromOptions(customHeaders: Headers, executionParams: ExecutionParams): Record<string, string> {
Expand Down Expand Up @@ -517,7 +565,7 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {

const subscriptionsEndpoint = options.subscriptionsEndpoint || pointer;
if (options.useSSEForSubscription) {
subscriber = this.buildSSESubscriber(subscriptionsEndpoint, options.eventSourceOptions);
subscriber = this.buildSSESubscriber(subscriptionsEndpoint, options.headers, fetch, options.eventSourceOptions);
} else {
const webSocketImpl = await this.getWebSocketImpl(options, asyncImport);
const connectionParams = () => ({ headers: this.getHeadersFromOptions(options.headers, {} as any) });
Expand Down Expand Up @@ -552,7 +600,13 @@ export class UrlLoader implements DocumentLoader<LoadFromUrlOptions> {
const subscriptionsEndpoint = options.subscriptionsEndpoint || pointer;
let subscriber: Subscriber;
if (options.useSSEForSubscription) {
subscriber = this.buildSSESubscriber(subscriptionsEndpoint, options.eventSourceOptions);
const asyncFetchFn: any = (...args: any[]) => this.getFetch(options?.customFetch, asyncImport, true).then((asyncFetch: any) => asyncFetch(...args));
subscriber = this.buildSSESubscriber(
subscriptionsEndpoint,
options.headers,
asyncFetchFn,
options.eventSourceOptions
);
} else {
const webSocketImpl = this.getWebSocketImpl(options, syncImport);
const connectionParams = () => ({ headers: this.getHeadersFromOptions(options.headers, {} as any) });
Expand Down
22 changes: 5 additions & 17 deletions yarn.lock
Expand Up @@ -296,13 +296,6 @@
tslib "^1.11.1"
yargs "15.3.1"

"@ardatan/eventsource@1.1.2":
version "1.1.2"
resolved "https://registry.yarnpkg.com/@ardatan/eventsource/-/eventsource-1.1.2.tgz#80c2f3a53a44a0bfe2d6f91f523daf1da0d3d887"
integrity sha512-ch/B82cVVUosCRX4Nn7b+scWhYfHFFHl8TbsaaDTUlPqH9JxXWmAGo1JHzWr2no0mcW+jW4qHUpoL4X9JYVEBw==
dependencies:
original "^1.0.0"

"@babel/code-frame@7.10.4":
version "7.10.4"
resolved "https://registry.yarnpkg.com/@babel/code-frame/-/code-frame-7.10.4.tgz#168da1a36e90da68ae8d49c0f1b48c7c6249213a"
Expand Down Expand Up @@ -2600,6 +2593,11 @@
resolved "https://registry.yarnpkg.com/@mdx-js/util/-/util-1.6.22.tgz#219dfd89ae5b97a8801f015323ffa4b62f45718b"
integrity sha512-H1rQc1ZOHANWBvPcW+JpGwr+juXSxM8Q8YCkm3GhZd8REu1fHR3z99CErO1p9pkcfcxZnMdIZdIsXkOHY0NilA==

"@microsoft/fetch-event-source@2.0.1":
version "2.0.1"
resolved "https://registry.yarnpkg.com/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz#9ceecc94b49fbaa15666e38ae8587f64acce007d"
integrity sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==

"@nodelib/fs.scandir@2.1.4":
version "2.1.4"
resolved "https://registry.yarnpkg.com/@nodelib/fs.scandir/-/fs.scandir-2.1.4.tgz#d4b3549a5db5de2683e0c1071ab4f140904bbf69"
Expand Down Expand Up @@ -2899,11 +2897,6 @@
resolved "https://registry.yarnpkg.com/@types/estree/-/estree-0.0.39.tgz#e177e699ee1b8c22d23174caaa7422644389509f"
integrity sha512-EYNwp3bU+98cpU4lAWYYL7Zz+2gryWH1qbdDTidVd6hkiR6weksdbMadyXKXNPEkQFhXM+hVO9ZygomHXp+AIw==

"@types/eventsource@1.1.5":
version "1.1.5"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.5.tgz#408e9b45efb176c8bea672ab58c81e7ab00d24bc"
integrity sha512-BA9q9uC2PAMkUS7DunHTxWZZaVpeNzDG8lkBxcKwzKJClfDQ4Z59/Csx7HSH/SIqFN2JWh0tAKAM6k/wRR0OZg==

"@types/express-serve-static-core@^4.17.18":
version "4.17.19"
resolved "https://registry.yarnpkg.com/@types/express-serve-static-core/-/express-serve-static-core-4.17.19.tgz#00acfc1632e729acac4f1530e9e16f6dd1508a1d"
Expand Down Expand Up @@ -13924,11 +13917,6 @@ sprintf-js@~1.0.2:
resolved "https://registry.yarnpkg.com/sprintf-js/-/sprintf-js-1.0.3.tgz#04e6926f662895354f3dd015203633b857297e2c"
integrity sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=

sse-z@0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/sse-z/-/sse-z-0.3.0.tgz#e215db7c303d6c4a4199d80cb63811cc28fa55b9"
integrity sha512-jfcXynl9oAOS9YJ7iqS2JMUEHOlvrRAD+54CENiWnc4xsuVLQVSgmwf7cwOTcBd/uq3XkQKBGojgvEtVXcJ/8w==

sshpk@^1.7.0:
version "1.16.1"
resolved "https://registry.yarnpkg.com/sshpk/-/sshpk-1.16.1.tgz#fb661c0bef29b39db40769ee39fa70093d6f6877"
Expand Down

0 comments on commit 77d63ab

Please sign in to comment.