From 7b48097d5d3347482e15bc3ee0546a6005bdb1cf Mon Sep 17 00:00:00 2001 From: Hagai Cohen Date: Fri, 10 Feb 2017 11:53:09 +0200 Subject: [PATCH] chore(ws-core): refactor underlying protocol See #272 for more info. --- .../src/renderGraphiQL.ts | 57 ++++++--- .../graphql-server-reactive-core/package.json | 1 - .../src/RequestsManager.ts | 119 ++++++++++++------ .../graphql-server-reactive-core/src/index.ts | 3 +- .../src/messageTypes.ts | 21 ++++ packages/graphql-server-ws/src/index.ts | 16 +-- 6 files changed, 152 insertions(+), 65 deletions(-) create mode 100644 packages/graphql-server-reactive-core/src/messageTypes.ts diff --git a/packages/graphql-server-module-graphiql/src/renderGraphiQL.ts b/packages/graphql-server-module-graphiql/src/renderGraphiQL.ts index fc5073ffb26..c0671a22596 100644 --- a/packages/graphql-server-module-graphiql/src/renderGraphiQL.ts +++ b/packages/graphql-server-module-graphiql/src/renderGraphiQL.ts @@ -46,20 +46,18 @@ export function renderGraphiQL(data: GraphiQLData): string { const operationName = data.operationName; const passHeader = data.passHeader ? data.passHeader : ''; const fetchLibrary = isWs ? - ` - ` : + `` : ``; const fetcher = isWs ? ` - Rx.Observable.prototype.fromDiff = rxjsDiffOperator.fromDiff; var reqId = 0; - function graphQLFetcher(graphQLParams) { - var localReqId = reqId++; + function messageObservable(localReqId, graphQLParams) { return new Rx.Observable(function (observer) { - var payload = JSON.stringify(Object.assign({}, graphQLParams, { - action: "request", + var payload = JSON.stringify({ + type: "start", id: localReqId, - })); + payload: graphQLParams, + }); var isOpen = false; var ws = new WebSocket(fetchURL); @@ -70,8 +68,12 @@ export function renderGraphiQL(data: GraphiQLData): string { ws.onerror = function(e) { observer.error(new Error("WebSocket error")); }; - ws.onclose = function () { - observer.complete(); + ws.onclose = function (closeEvent) { + if ( closeEvent.wasClean ) { + observer.complete(); + } else { + observer.error(new Error('Connection closed unexpectedly')); + } }; ws.onopen = function () { isOpen = true; @@ -79,17 +81,42 @@ export function renderGraphiQL(data: GraphiQLData): string { }; return function () { - if ( isOpen == true ) { - ws.send(JSON.stringify({ id: localReqId, action: "cancel" })); + if ( (isOpen == true) && (ws.readyState === 1) ) { + ws.send(JSON.stringify({ id: localReqId, type: "stop" })); } ws.close(); }; }) + .retryWhen((e) => e.delay(500)) + .share() .map(JSON.parse) .filter(function (v) { return v.id == localReqId }) - .fromDiff() - .catch((e) => { - return Rx.Observable.of({ errors: [e.message] }); + .catch(function (e) { + return Rx.Observable.of({ id: localReqId, type: 'error', payload: e }); + }); + } + + function graphQLFetcher(graphQLParams) { + var localReqId = reqId++; + return new Rx.Observable(function (observer) { + return messageObservable(localReqId, graphQLParams) + .subscribe(function(packet) { + switch (packet.type) { + case 'data': + return observer.next(packet.payload); + case 'error': + observer.error(new Error(packet.payload)); + return observer.complete(); + case 'complete': + return observer.complete(); + default: + observer.error(new Error('Recieved invalid message type from server')); + } + }, function(e) { + observer.error(e); + }, function() { + observer.complete(); + }); }); } ` : ` diff --git a/packages/graphql-server-reactive-core/package.json b/packages/graphql-server-reactive-core/package.json index 381719bb336..838d8e26a74 100644 --- a/packages/graphql-server-reactive-core/package.json +++ b/packages/graphql-server-reactive-core/package.json @@ -38,7 +38,6 @@ }, "dependencies": { "graphql-server-core": "^0.6.0", - "observable-diff-operator": "^0.1.1", "symbol-observable": "^1.0.4" } } diff --git a/packages/graphql-server-reactive-core/src/RequestsManager.ts b/packages/graphql-server-reactive-core/src/RequestsManager.ts index fd3c8b82a13..b26ec0a0945 100644 --- a/packages/graphql-server-reactive-core/src/RequestsManager.ts +++ b/packages/graphql-server-reactive-core/src/RequestsManager.ts @@ -1,27 +1,34 @@ import { IObservable, Observer, Observable, Subscription } from './Observable'; import { ReactiveQueryOptions, runQueryReactive } from './runQueryReactive'; import { ReactiveGraphQLOptions } from './reactiveOptions'; -import { toDiffObserver, IObservableDiff } from 'observable-diff-operator'; import { formatError, ExecutionResult } from 'graphql'; - -export interface ReactiveRequest { +import { + RGQL_MSG_ERROR, + RGQL_MSG_COMPLETE, + RGQL_MSG_DATA, + RGQL_MSG_START, + RGQL_MSG_STOP, + RGQLMessageType, + RGQLPayloadType, + RGQLPayloadStart, +} from './messageTypes'; + +export interface RGQLPacket { id: number; // Per socket increasing number - action: 'request' | 'cancel'; - query?: string; // GraphQL Printed Query. - variables?: any; // GraphQL variables. - operationName?: string; // GraphQL operationName + type: RGQLMessageType; + payload: RGQLPayloadType, } -export interface ReactiveMessage { - requestParams: ReactiveRequest; +export interface ReactiveRequest { + packet: RGQLPacket; graphqlOptions?: ReactiveGraphQLOptions; } export class RequestsManager { protected requests = {}; - public handleRequest(message: ReactiveMessage, onMessageObserver: Observer) { - this._subscribeRequest(this._prepareRequest(message), message.requestParams.id, onMessageObserver); + public handleRequest(message: ReactiveRequest, onMessageObserver: Observer) { + this._subscribeRequest(this._prepareRequest(message), message.packet.id, onMessageObserver); } public unsubscribeAll() { @@ -30,9 +37,9 @@ export class RequestsManager { }); } - protected _subscribeRequest(obs: IObservable, key: number, onMessageObserver: Observer): void { - const diffObs = new Observable((observer) => { - return obs.subscribe(toDiffObserver({ + protected _prepareRespond(obs: IObservable, key: number): IObservable { + return new Observable((observer) => { + return this._convertRespond(obs).subscribe({ next: (data) => { if ( undefined !== key ) { observer.next(Object.assign(data, { id: key })); @@ -42,13 +49,42 @@ export class RequestsManager { }, error: observer.error, complete: observer.complete, - })); + }); }); + } + protected _convertRespond(obs: IObservable): IObservable { + return new Observable((observer) => { + return obs.subscribe({ + next: (data) => { + observer.next({ + type: RGQL_MSG_DATA, + payload: data, + }); + }, + error: (e) => { + observer.next({ + type: RGQL_MSG_ERROR, + payload: e, + }); + observer.complete(); + }, + complete: () => { + observer.next({ + type: RGQL_MSG_COMPLETE, + }); + observer.complete(); + }, + }); + }); + } + + protected _subscribeRequest(obs: IObservable, key: number, onMessageObserver: Observer): void { + const respondObs = this._prepareRespond(obs, key); if ( key ) { - this.requests[key] = diffObs.subscribe(onMessageObserver); + this.requests[key] = respondObs.subscribe(onMessageObserver); } else { - diffObs.subscribe(onMessageObserver); + respondObs.subscribe(onMessageObserver); } } @@ -59,27 +95,27 @@ export class RequestsManager { } } - protected _prepareRequest({requestParams, graphqlOptions}: ReactiveMessage): IObservable { + protected _prepareRequest({packet, graphqlOptions}: ReactiveRequest): IObservable { const formatErrorFn = graphqlOptions.formatError || formatError; try { - this._validateRequest(requestParams); + this._validateRequest(packet); } catch (e) { return Observable.of({ errors: [formatErrorFn(e)] }); } - if (Array.isArray(requestParams)) { + if (Array.isArray(packet)) { return Observable.of({ errors: [formatErrorFn(new Error('Websocket does not support batching'))] }); } - this._unsubscribe(requestParams.id); + this._unsubscribe(packet.id); - if ( requestParams.action === 'cancel' ) { + if ( packet.type === RGQL_MSG_STOP ) { return Observable.empty(); } - - const query = requestParams.query; - const operationName = requestParams.operationName; - let variables = requestParams.variables; + const graphqlRequest: RGQLPayloadStart = packet.payload; + const query = graphqlRequest.query; + const operationName = graphqlRequest.operationName; + let variables = graphqlRequest.variables; if (typeof variables === 'string') { try { @@ -111,26 +147,29 @@ export class RequestsManager { return runQueryReactive(params); } - protected _validateRequest(request: ReactiveRequest) { - if ( undefined === request.id ) { + protected _validateRequest(packet: RGQLPacket) { + if ( undefined === packet.id ) { throw new Error('Message missing id field'); } - if ( undefined === request.action ) { - throw new Error('Message missing action field'); + if ( undefined === packet.type ) { + throw new Error('Message missing type field'); } - switch ( request.action ) { - case 'request': - if ( undefined === request.query ) { - throw new Error('Message missing query field'); + switch ( packet.type ) { + case RGQL_MSG_START: + if ( undefined === packet.payload ) { + throw new Error('Message missing payload field'); + } + if ( undefined === ( packet.payload).query ) { + throw new Error('Message missing payload.query field'); } return; - case 'cancel': - // Allgood :) - return; - default: - throw new Error('Invalid action used'); - } + case RGQL_MSG_STOP: + // Nothing much to check, no payload. + return; + default: + throw new Error('Invalid type used'); + } } } diff --git a/packages/graphql-server-reactive-core/src/index.ts b/packages/graphql-server-reactive-core/src/index.ts index 9809add3056..d4383d0c1c7 100644 --- a/packages/graphql-server-reactive-core/src/index.ts +++ b/packages/graphql-server-reactive-core/src/index.ts @@ -1,5 +1,6 @@ export * from 'graphql-server-core'; +export * from './messageTypes'; export { Observable, IObservable, Observer, Subscription } from './Observable'; export { runQueryReactive, ReactiveQueryOptions } from './runQueryReactive'; export { RGQLExecutor, RGQLExecuteFunction, ReactiveGraphQLOptions } from './reactiveOptions'; -export { RequestsManager, ReactiveRequest, ReactiveMessage } from './RequestsManager'; +export { RequestsManager, ReactiveRequest, RGQLPacket } from './RequestsManager'; diff --git a/packages/graphql-server-reactive-core/src/messageTypes.ts b/packages/graphql-server-reactive-core/src/messageTypes.ts new file mode 100644 index 00000000000..261fc716e41 --- /dev/null +++ b/packages/graphql-server-reactive-core/src/messageTypes.ts @@ -0,0 +1,21 @@ +import { ExecutionResult } from 'graphql'; +export const RGQL_MSG_ERROR = 'error'; +export const RGQL_MSG_COMPLETE = 'complete'; +export const RGQL_MSG_DATA = 'data'; +export const RGQL_MSG_START = 'start'; +export const RGQL_MSG_STOP = 'stop'; +export type RGQLMessageType = ( + 'error' | + 'complete' | + 'data' | + 'start' | + 'stop' +); +export type RGQLPayloadError = Error; +export type RGQLPayloadData = ExecutionResult; +export interface RGQLPayloadStart { + query?: string; + variables?: any; + operationName?: string; +}; +export type RGQLPayloadType = RGQLPayloadError | RGQLPayloadData | RGQLPayloadStart; diff --git a/packages/graphql-server-ws/src/index.ts b/packages/graphql-server-ws/src/index.ts index 2fdba9836aa..a415c3d8acf 100644 --- a/packages/graphql-server-ws/src/index.ts +++ b/packages/graphql-server-ws/src/index.ts @@ -6,14 +6,14 @@ import { resolveGraphqlOptions, ReactiveGraphQLOptions, ReactiveRequest, - ReactiveMessage, + RGQLPacket, RequestsManager, } from 'graphql-server-reactive-core'; export { ReactiveGraphQLOptions }; import * as Websocket from 'ws'; -export interface WSMessageParams extends ReactiveMessage { +export interface WSRequest extends ReactiveRequest { flags: { binary: boolean; }; @@ -27,14 +27,14 @@ export interface WSHandler { (ws: Websocket): void; } -function ObservableFromWs(ws: Websocket, graphqlOptions: ReactiveGraphQLOptions): IObservable { - return new Observable((observer) => { +function ObservableFromWs(ws: Websocket, graphqlOptions: ReactiveGraphQLOptions): IObservable { + return new Observable((observer) => { let nextListener = (data: any, flags: {binary: boolean}) => { - let request: WSMessageParams; + let request: WSRequest; try { try { request = { - requestParams: JSON.parse(data) as ReactiveRequest, + packet: JSON.parse(data) as RGQLPacket, graphqlOptions, flags: flags, }; @@ -82,8 +82,8 @@ export function graphqlWs(options: ReactiveGraphQLOptions | WSGraphQLOptionsFunc .then((graphqlOptions: ReactiveGraphQLOptions) => { const requests = new RequestsManager(); const subscription = ObservableFromWs(ws, graphqlOptions).subscribe({ - next: (message) => { - requests.handleRequest(message, { + next: (request) => { + requests.handleRequest(request, { next: (data) => ws.send(JSON.stringify(data)), error: (e) => ws.close(1008, e.message), complete: () => {/* noop */},