Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

onSubscribe and use middleware to modify runtime SubscriptionOptions #78

Merged
merged 14 commits into from
May 16, 2017
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ npm-debug.log
browser/
.idea/
.vscode/
npm-debug.log.*
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

### vNEXT
- Client exposes new asyncronous middleware to modify `OperationOptions` [PR #78](https://github.com/apollographql/subscriptions-transport-ws/pull/78)

### 0.6.0

Expand All @@ -12,7 +13,6 @@
- Added support in the server executor for `graphql-js subscribe`. [PR #846](https://github.com/graphql/graphql-js/pull/846)

### 0.5.5

- Remove dependency on `graphql-tag/printer` per [graphql-tag#54](https://github.com/apollographql/graphql-tag/issues/54) [PR #98](https://github.com/apollographql/subscriptions-transport-ws/pull/98)

### 0.5.4
Expand Down
74 changes: 66 additions & 8 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import { ExecutionResult } from 'graphql/execution/execute';
import { print } from 'graphql/language/printer';
import { getOperationAST } from 'graphql/utilities/getOperationAST';

import MessageTypes from './message-types';
import { GRAPHQL_WS } from './protocol';
import { WS_TIMEOUT } from './defaults';
import { MiddlewareInterface } from './middleware';
import MessageTypes from './message-types';

export * from './helpers';

Expand Down Expand Up @@ -63,6 +64,8 @@ export class SubscriptionClient {
private wsImpl: any;
private wasKeepAliveReceived: boolean;
private checkConnectionTimeoutId: any;
private middlewares: MiddlewareInterface[];
private pendingSubscriptions: {[id: string]: boolean};

constructor(url: string, options?: ClientOptions, webSocketImpl?: any) {
const {
Expand Down Expand Up @@ -91,6 +94,8 @@ export class SubscriptionClient {
this.reconnectionAttempts = reconnectionAttempts;
this.backoff = new Backoff({ jitter: 0.5 });
this.eventEmitter = new EventEmitter();
this.middlewares = [];
this.pendingSubscriptions = {};

this.connect();
}
Expand Down Expand Up @@ -159,6 +164,13 @@ export class SubscriptionClient {
}

public unsubscribe(id: number) {
// operation hasn't sent message yet
// cancel without sending message to server
if (this.pendingSubscriptions[id]) {
delete this.pendingSubscriptions[id];
return;
}

if (this.operations[id]) {
delete this.operations[id];
}
Expand All @@ -171,17 +183,45 @@ export class SubscriptionClient {
});
}

private executeOperation(options: OperationOptions, handler: (error: Error[], result?: any) => void): number {
public applyMiddlewares(options: OperationOptions): Promise<OperationOptions> {
return new Promise((resolve, reject) => {
const queue = (funcs: MiddlewareInterface[], scope: any) => {
const next = () => {
if (funcs.length > 0) {
const f = funcs.shift();
if (f) {
f.applyMiddleware.apply(scope, [options, next]);
}
} else {
resolve(options);
}
};
next();
};

queue([...this.middlewares], this);
});
}

public use(middlewares: MiddlewareInterface[]): SubscriptionClient {
middlewares.map((middleware) => {
if (typeof middleware.applyMiddleware === 'function') {
this.middlewares.push(middleware);
} else {
throw new Error('Middleware must implement the applyMiddleware function');
}
});

return this;
}

private checkSubscriptionOptions(options: OperationOptions, handler: (error: Error[], result?: any) => void) {
const { query, variables, operationName } = options;

if (!query) {
throw new Error('Must provide a query.');
}

if (!handler) {
throw new Error('Must provide an handler.');
}

if (
( !isString(query) && !getOperationAST(query, operationName)) ||
( operationName && !isString(operationName)) ||
Expand All @@ -190,10 +230,28 @@ export class SubscriptionClient {
throw new Error('Incorrect option types. query must be a string or a document,' +
'`operationName` must be a string, and `variables` must be an object.');
}
}

private executeOperation(options: OperationOptions, handler: (error: Error[], result?: any) => void): number {
const opId = this.generateOperationId();
this.operations[opId] = { options, handler };
this.sendMessage(opId, MessageTypes.GQL_START, options);

// add subscription to operation
this.pendingSubscriptions[opId] = true;

this.applyMiddlewares(options).then(opts => {
this.checkSubscriptionOptions(opts, handler);

// if operation is unsubscribed already
// this.pendingSubscriptions[opId] will be deleted
if (this.pendingSubscriptions[opId]) {
delete this.pendingSubscriptions[opId];
this.operations[opId] = { options: opts, handler };
this.sendMessage(opId, MessageTypes.GQL_START, options);
}
}).catch((e: Error) => {
this.unsubscribe(opId);
handler([e]);
});

return opId;
}
Expand Down
5 changes: 5 additions & 0 deletions src/middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { OperationOptions } from './client';

export interface MiddlewareInterface {
applyMiddleware(options: OperationOptions, next: Function): void;
}
3 changes: 2 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ConnectionContext = {

export interface OperationMessagePayload {
[key: string]: any; // this will support for example any options sent in init like the auth token
context?: any;
query?: string;
variables?: {[key: string]: any};
operationName?: string;
Expand Down Expand Up @@ -423,7 +424,7 @@ export class SubscriptionServer {
query: parsedMessage.payload.query,
variables: parsedMessage.payload.variables,
operationName: parsedMessage.payload.operationName,
context: Object.assign({}, isObject(initResult) ? initResult : {}),
context: Object.assign({}, isObject(initResult) ? initResult : {}, parsedMessage.payload.context),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is context sent over the network?
@Urigo @helfer @Stubilo - Thats a serious security issue, you shall not release the package with that

formatResponse: <any>undefined,
formatError: <any>undefined,
callback: <any>undefined,
Expand Down
122 changes: 85 additions & 37 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,38 +294,37 @@ describe('Client', function () {
});
});

it('should throw an exception when query is not provided', () => {
it('should throw an exception when query is not provided', (done) => {
const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`);

expect(() => {
client.subscribe({
query: undefined,
operationName: 'useInfo',
variables: {
id: 3,
},
}, function (error: any, result: any) {
//do nothing
client.subscribe({
query: undefined,
operationName: 'useInfo',
variables: {
id: 3,
},
);
}).to.throw();
}, function (error: any, result: any) {
expect(error).to.be.lengthOf(1);
done();
},
);
});

it('should throw an exception when query is not valid', () => {
it('should throw an exception when query is not valid', (done) => {
const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`);

expect(() => {
client.subscribe({
query: <string>{},
operationName: 'useInfo',
variables: {
id: 3,
},
}, function (error: any, result: any) {
//do nothing
client.subscribe({
query: <string>{},
operationName: 'useInfo',
variables: {
id: 3,
},
);
}).to.throw();
}, function (error: any, result: any) {
//do nothing
expect(error).to.be.lengthOf(1);
done();
},
);
});

it('should throw an exception when handler is not provided', () => {
Expand Down Expand Up @@ -411,6 +410,43 @@ describe('Client', function () {
});
});

it('should override OperationOptions with middleware', function (done) {
const CTX = 'testContext';
const CTX2 = 'overrideContext';
const client3 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`);
client3.use([{
applyMiddleware(opts, next) {
// modify options for SubscriptionClient.subscribe here
setTimeout(() => {
opts.context = CTX2;
next();
}, 100);
},
}]);

client3.subscribe({
query: `subscription context {
context
}`,
variables: {},
context: CTX,
}, (error: any, result: any) => {
client3.unsubscribeAll();
if (error) {
assert(false);
}
if (result) {
assert.property(result, 'context');
assert.equal(result.context, CTX2);
}
done();
},
);
setTimeout(() => {
subscriptionManager.publish('context', {});
}, 200);
});

it('should handle correctly GQL_CONNECTION_ERROR message', (done) => {
wsServer.on('connection', (connection: any) => {
connection.on('message', (message: any) => {
Expand Down Expand Up @@ -484,7 +520,7 @@ describe('Client', function () {
}, 100);
});

it('queues messages while websocket is still connecting', function () {
it('queues messages while websocket is still connecting', function (done) {
const client = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`);

let subId = client.subscribe({
Expand All @@ -502,12 +538,16 @@ describe('Client', function () {
//do nothing
},
);
expect((client as any).unsentMessagesQueue.length).to.equals(1);
client.unsubscribe(subId);
expect((client as any).unsentMessagesQueue.length).to.equals(2);
setTimeout(() => {
expect((client as any).unsentMessagesQueue.length).to.equals(0);
}, 100);

client.onConnect(() => {
expect((client as any).unsentMessagesQueue.length).to.equals(1);
client.unsubscribe(subId);

setTimeout(() => {
expect((client as any).unsentMessagesQueue.length).to.equals(0);
done();
}, 100);
});
});

it('should call error handler when graphql result has errors', function (done) {
Expand Down Expand Up @@ -875,15 +915,23 @@ describe('Server', function () {
id: 3,
},
}, function (error: any, result: any) {
//do nothing
});
if (error) {
assert(false);
done();
}

client.unsubscribe(subId);
if (result) {
client.unsubscribe(subId);
setTimeout(() => {
assert(eventsOptions.onUnsubscribe.calledOnce);
done();
}, 200);
}
});

setTimeout(() => {
assert(eventsOptions.onUnsubscribe.calledOnce);
done();
}, 200);
subscriptionManager.publish('user', {});
}, 100);
});

it('should send correct results to multiple clients with subscriptions', function (done) {
Expand Down