Skip to content

Commit

Permalink
chore(ws-core): refactor underlying protocol
Browse files Browse the repository at this point in the history
See apollographql#272 for more info.
  • Loading branch information
DxCx committed Mar 22, 2017
1 parent e3a69bb commit 1b45921
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 65 deletions.
57 changes: 42 additions & 15 deletions packages/graphql-server-module-graphiql/src/renderGraphiQL.ts
Expand Up @@ -46,20 +46,18 @@ export function renderGraphiQL(data: GraphiQLData): string {
const operationName = data.operationName;
const passHeader = data.passHeader ? data.passHeader : '';
const fetchLibrary = isWs ?
`<script src="//npmcdn.com/@reactivex/rxjs@5.1.0/dist/global/Rx.min.js"></script>
<script src="//npmcdn.com/rxjs-diff-operator@0.1.1/dist/main.browser.js"></script>` :
`<script src="//npmcdn.com/@reactivex/rxjs@5.1.0/dist/global/Rx.min.js"></script>` :
`<script src="//cdn.jsdelivr.net/fetch/0.9.0/fetch.min.js"></script>`;
const fetcher = isWs ? `
Rx.Observable.prototype.fromDiff = rxjsDiffOperator.fromDiff;
var reqId = 0;
function graphQLFetcher(graphQLParams) {
var localReqId = reqId++;
function messageObservable(localReqId, graphQLParams) {
return new Rx.Observable(function (observer) {
var payload = JSON.stringify(Object.assign({}, graphQLParams, {
action: "request",
var payload = JSON.stringify({
type: "start",
id: localReqId,
}));
payload: graphQLParams,
});
var isOpen = false;
var ws = new WebSocket(fetchURL);
Expand All @@ -70,26 +68,55 @@ export function renderGraphiQL(data: GraphiQLData): string {
ws.onerror = function(e) {
observer.error(new Error("WebSocket error"));
};
ws.onclose = function () {
observer.complete();
ws.onclose = function (closeEvent) {
if ( closeEvent.wasClean ) {
observer.complete();
} else {
observer.error(new Error('Connection closed unexpectedly'));
}
};
ws.onopen = function () {
isOpen = true;
ws.send(payload);
};
return function () {
if ( isOpen == true ) {
ws.send(JSON.stringify({ id: localReqId, action: "cancel" }));
if ( (isOpen == true) && (ws.readyState === 1) ) {
ws.send(JSON.stringify({ id: localReqId, type: "stop" }));
}
ws.close();
};
})
.retryWhen((e) => e.delay(500))
.share()
.map(JSON.parse)
.filter(function (v) { return v.id == localReqId })
.fromDiff()
.catch((e) => {
return Rx.Observable.of({ errors: [e.message] });
.catch(function (e) {
return Rx.Observable.of({ id: localReqId, type: 'error', payload: e });
});
}
function graphQLFetcher(graphQLParams) {
var localReqId = reqId++;
return new Rx.Observable(function (observer) {
return messageObservable(localReqId, graphQLParams)
.subscribe(function(packet) {
switch (packet.type) {
case 'data':
return observer.next(packet.payload);
case 'error':
observer.error(new Error(packet.payload));
return observer.complete();
case 'complete':
return observer.complete();
default:
observer.error(new Error('Recieved invalid message type from server'));
}
}, function(e) {
observer.error(e);
}, function() {
observer.complete();
});
});
}
` : `
Expand Down
1 change: 0 additions & 1 deletion packages/graphql-server-reactive-core/package.json
Expand Up @@ -38,7 +38,6 @@
},
"dependencies": {
"graphql-server-core": "^0.6.0",
"observable-diff-operator": "^0.1.1",
"symbol-observable": "^1.0.4"
}
}
119 changes: 79 additions & 40 deletions packages/graphql-server-reactive-core/src/RequestsManager.ts
@@ -1,27 +1,34 @@
import { IObservable, Observer, Observable, Subscription } from './Observable';
import { ReactiveQueryOptions, runQueryReactive } from './runQueryReactive';
import { ReactiveGraphQLOptions } from './reactiveOptions';
import { toDiffObserver, IObservableDiff } from 'observable-diff-operator';
import { formatError, ExecutionResult } from 'graphql';

export interface ReactiveRequest {
import {
RGQL_MSG_ERROR,
RGQL_MSG_COMPLETE,
RGQL_MSG_DATA,
RGQL_MSG_START,
RGQL_MSG_STOP,
RGQLMessageType,
RGQLPayloadType,
RGQLPayloadStart,
} from './messageTypes';

export interface RGQLPacket {
id: number; // Per socket increasing number
action: 'request' | 'cancel';
query?: string; // GraphQL Printed Query.
variables?: any; // GraphQL variables.
operationName?: string; // GraphQL operationName
type: RGQLMessageType;
payload: RGQLPayloadType,
}

export interface ReactiveMessage {
requestParams: ReactiveRequest;
export interface ReactiveRequest {
packet: RGQLPacket;
graphqlOptions?: ReactiveGraphQLOptions;
}

export class RequestsManager {
protected requests = {};

public handleRequest(message: ReactiveMessage, onMessageObserver: Observer<IObservableDiff>) {
this._subscribeRequest(this._prepareRequest(message), message.requestParams.id, onMessageObserver);
public handleRequest(message: ReactiveRequest, onMessageObserver: Observer<RGQLPacket>) {
this._subscribeRequest(this._prepareRequest(message), message.packet.id, onMessageObserver);
}

public unsubscribeAll() {
Expand All @@ -30,9 +37,9 @@ export class RequestsManager {
});
}

protected _subscribeRequest(obs: IObservable<ExecutionResult>, key: number, onMessageObserver: Observer<IObservableDiff>): void {
const diffObs = new Observable((observer) => {
return obs.subscribe(toDiffObserver({
protected _prepareRespond(obs: IObservable<ExecutionResult>, key: number): IObservable<RGQLPacket> {
return new Observable((observer) => {
return this._convertRespond(obs).subscribe({
next: (data) => {
if ( undefined !== key ) {
observer.next(Object.assign(data, { id: key }));
Expand All @@ -42,13 +49,42 @@ export class RequestsManager {
},
error: observer.error,
complete: observer.complete,
}));
});
});
}

protected _convertRespond(obs: IObservable<ExecutionResult>): IObservable<RGQLPacket> {
return new Observable((observer) => {
return obs.subscribe({
next: (data) => {
observer.next({
type: RGQL_MSG_DATA,
payload: data,
});
},
error: (e) => {
observer.next({
type: RGQL_MSG_ERROR,
payload: e,
});
observer.complete();
},
complete: () => {
observer.next({
type: RGQL_MSG_COMPLETE,
});
observer.complete();
},
});
});
}

protected _subscribeRequest(obs: IObservable<ExecutionResult>, key: number, onMessageObserver: Observer<RGQLPacket>): void {
const respondObs = this._prepareRespond(obs, key);
if ( key ) {
this.requests[key] = diffObs.subscribe(onMessageObserver);
this.requests[key] = respondObs.subscribe(onMessageObserver);
} else {
diffObs.subscribe(onMessageObserver);
respondObs.subscribe(onMessageObserver);
}
}

Expand All @@ -59,27 +95,27 @@ export class RequestsManager {
}
}

protected _prepareRequest({requestParams, graphqlOptions}: ReactiveMessage): IObservable<ExecutionResult> {
protected _prepareRequest({packet, graphqlOptions}: ReactiveRequest): IObservable<ExecutionResult> {
const formatErrorFn = graphqlOptions.formatError || formatError;

try {
this._validateRequest(requestParams);
this._validateRequest(packet);
} catch (e) {
return Observable.of({ errors: [formatErrorFn(e)] });
}

if (Array.isArray(requestParams)) {
if (Array.isArray(packet)) {
return Observable.of({ errors: [formatErrorFn(new Error('Websocket does not support batching'))] });
}
this._unsubscribe(requestParams.id);
this._unsubscribe(packet.id);

if ( requestParams.action === 'cancel' ) {
if ( packet.type === RGQL_MSG_STOP ) {
return Observable.empty();
}

const query = requestParams.query;
const operationName = requestParams.operationName;
let variables = requestParams.variables;
const graphqlRequest: RGQLPayloadStart = packet.payload;
const query = graphqlRequest.query;
const operationName = graphqlRequest.operationName;
let variables = graphqlRequest.variables;

if (typeof variables === 'string') {
try {
Expand Down Expand Up @@ -111,26 +147,29 @@ export class RequestsManager {
return runQueryReactive(params);
}

protected _validateRequest(request: ReactiveRequest) {
if ( undefined === request.id ) {
protected _validateRequest(packet: RGQLPacket) {
if ( undefined === packet.id ) {
throw new Error('Message missing id field');
}

if ( undefined === request.action ) {
throw new Error('Message missing action field');
if ( undefined === packet.type ) {
throw new Error('Message missing type field');
}

switch ( request.action ) {
case 'request':
if ( undefined === request.query ) {
throw new Error('Message missing query field');
switch ( packet.type ) {
case RGQL_MSG_START:
if ( undefined === packet.payload ) {
throw new Error('Message missing payload field');
}
if ( undefined === (<RGQLPayloadStart> packet.payload).query ) {
throw new Error('Message missing payload.query field');
}
return;
case 'cancel':
// Allgood :)
return;
default:
throw new Error('Invalid action used');
}
case RGQL_MSG_STOP:
// Nothing much to check, no payload.
return;
default:
throw new Error('Invalid type used');
}
}
}
3 changes: 2 additions & 1 deletion packages/graphql-server-reactive-core/src/index.ts
@@ -1,5 +1,6 @@
export * from 'graphql-server-core';
export * from './messageTypes';
export { Observable, IObservable, Observer, Subscription } from './Observable';
export { runQueryReactive, ReactiveQueryOptions } from './runQueryReactive';
export { RGQLExecutor, RGQLExecuteFunction, ReactiveGraphQLOptions } from './reactiveOptions';
export { RequestsManager, ReactiveRequest, ReactiveMessage } from './RequestsManager';
export { RequestsManager, ReactiveRequest, RGQLPacket } from './RequestsManager';
21 changes: 21 additions & 0 deletions packages/graphql-server-reactive-core/src/messageTypes.ts
@@ -0,0 +1,21 @@
import { ExecutionResult } from 'graphql';
export const RGQL_MSG_ERROR = 'error';
export const RGQL_MSG_COMPLETE = 'complete';
export const RGQL_MSG_DATA = 'data';
export const RGQL_MSG_START = 'start';
export const RGQL_MSG_STOP = 'stop';
export type RGQLMessageType = (
'error' |
'complete' |
'data' |
'start' |
'stop'
);
export type RGQLPayloadError = Error;
export type RGQLPayloadData = ExecutionResult;
export interface RGQLPayloadStart {
query?: string;
variables?: any;
operationName?: string;
};
export type RGQLPayloadType = RGQLPayloadError | RGQLPayloadData | RGQLPayloadStart;
16 changes: 8 additions & 8 deletions packages/graphql-server-ws/src/index.ts
Expand Up @@ -6,14 +6,14 @@ import {
resolveGraphqlOptions,
ReactiveGraphQLOptions,
ReactiveRequest,
ReactiveMessage,
RGQLPacket,
RequestsManager,
} from 'graphql-server-reactive-core';
export { ReactiveGraphQLOptions };

import * as Websocket from 'ws';

export interface WSMessageParams extends ReactiveMessage {
export interface WSRequest extends ReactiveRequest {
flags: {
binary: boolean;
};
Expand All @@ -27,14 +27,14 @@ export interface WSHandler {
(ws: Websocket): void;
}

function ObservableFromWs(ws: Websocket, graphqlOptions: ReactiveGraphQLOptions): IObservable<WSMessageParams> {
return new Observable<WSMessageParams>((observer) => {
function ObservableFromWs(ws: Websocket, graphqlOptions: ReactiveGraphQLOptions): IObservable<WSRequest> {
return new Observable<WSRequest>((observer) => {
let nextListener = (data: any, flags: {binary: boolean}) => {
let request: WSMessageParams;
let request: WSRequest;
try {
try {
request = {
requestParams: JSON.parse(data) as ReactiveRequest,
packet: JSON.parse(data) as RGQLPacket,
graphqlOptions,
flags: flags,
};
Expand Down Expand Up @@ -82,8 +82,8 @@ export function graphqlWs(options: ReactiveGraphQLOptions | WSGraphQLOptionsFunc
.then((graphqlOptions: ReactiveGraphQLOptions) => {
const requests = new RequestsManager();
const subscription = ObservableFromWs(ws, graphqlOptions).subscribe({
next: (message) => {
requests.handleRequest(message, {
next: (request) => {
requests.handleRequest(request, {
next: (data) => ws.send(JSON.stringify(data)),
error: (e) => ws.close(1008, e.message),
complete: () => {/* noop */},
Expand Down

0 comments on commit 1b45921

Please sign in to comment.