Skip to content

Commit

Permalink
chore(ws-core): support callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
DxCx committed Feb 21, 2017
1 parent 4d14c95 commit 1db893d
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 52 deletions.
14 changes: 7 additions & 7 deletions packages/graphql-server-reactive-core/src/runQueryReactive.ts
@@ -1,11 +1,11 @@
import {
ExecutionResult,
DocumentNode,
parse,
print,
validate,
formatError,
specifiedRules,
ExecutionResult,
DocumentNode,
parse,
print,
validate,
formatError,
specifiedRules,
} from 'graphql';
import { LogAction, LogFunction, LogMessage, LogStep, QueryOptions} from 'graphql-server-core';
import { Observable, IObservable } from 'graphql-server-observable';
Expand Down
263 changes: 256 additions & 7 deletions packages/graphql-server-reactive-protocol/src/RequestsManager.test.ts
Expand Up @@ -11,7 +11,7 @@ import {
ExecutionResult,
} from 'graphql';
import * as graphqlRxjs from 'graphql-rxjs';
import { Observable, BehaviorSubject, Subject } from 'rxjs';
import { Scheduler, Observable, BehaviorSubject, Subject } from 'rxjs';
import { IObservable } from 'graphql-server-observable';

import {
Expand Down Expand Up @@ -146,15 +146,18 @@ describe('RequestsManager', () => {
});

it('can handle simple requests', () => {
const input = Observable.of(<RGQLPacket>{
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
id: 1,
type: RGQL_MSG_START,
payload: {
query: `query { testString }`,
},
},
});
}),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = [
{
Expand Down Expand Up @@ -291,6 +294,246 @@ describe('RequestsManager', () => {
});
});

it('supports rejection on init packet error', () => {
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
type: RGQL_MSG_INIT,
payload: {},
},
}),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = {
type: RGQL_MSG_ERROR,
payload: undefined,
};

const reqMngr = new RequestsManager({
schema,
executor: graphqlRxjs,
}, input, {
onInit: () => false,
});

return wrapToRx(reqMngr.responseObservable)
.map((v) => v.data)
.toPromise().then((res) => {
const e: Error = res.payload as Error;
expect(e.message).to.be.equals('Prohibited connection!');

expected.payload = e;
expect(res).to.deep.equal(expected);
});
});

it('rejects bad return value from onRequestStart', () => {
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
id: 1,
type: RGQL_MSG_START,
payload: {
query: `query { testString }`,
},
},
}),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = {
id: 1,
type: RGQL_MSG_ERROR,
payload: undefined,
};

const reqMngr = new RequestsManager({
schema,
executor: graphqlRxjs,
}, input, {
onRequestStart: (() => false) as any,
});

return wrapToRx(reqMngr.responseObservable)
.map((v) => v.data)
.toPromise().then((res) => {
const e: Error = res.payload as Error;
expect(e.message).to.be.equals('Invalid params returned from onRequestStart');

expected.payload = e;
expect(res).to.deep.equal(expected);
});
});

it('rejects on exception from onRequestStop', () => {
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
id: 1,
type: RGQL_MSG_START,
payload: {
query: `query { testString }`,
},
},
}),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = {
id: 1,
type: RGQL_MSG_ERROR,
payload: undefined,
};

const reqMngr = new RequestsManager({
schema,
executor: graphqlRxjs,
}, input, {
onRequestStop: (requestId) => {
throw new Error('onRequestStop test error');
},
});

return wrapToRx(reqMngr.responseObservable)
.map((v) => v.data)
.toPromise().then((res) => {
const e: Error = res.payload as Error;
expect(e.message).to.be.equals('onRequestStop test error');

expected.payload = e;
expect(res).to.deep.equal(expected);
});
});

it('doesn\'t fail if onDisconnect throws', () => {
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
id: 1,
type: RGQL_MSG_START,
payload: {
query: `query { testString }`,
},
},
}),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = [
{
id: 1,
type: RGQL_MSG_DATA,
payload: {
data: {
testString: 'it works',
},
},
},

// finite result, the server sends complete.
{
id: 1,
type: RGQL_MSG_COMPLETE,
},
];

const reqMngr = new RequestsManager({
schema,
executor: graphqlRxjs,
}, input, {
onDisconnect: () => {
throw new Error('onDisconnect error test');
},
});

return wrapToRx(reqMngr.responseObservable)
.map((v) => v.data)
.bufferCount(expected.length + 1)
.toPromise().then((res) => {
expect(res).to.deep.equal(expected);
});
});

it('supports callbacks', () => {
const initPayload = 'initString';
let onDisconnectCalled = false;
let onStartCalled = false;
let onStopCalled = false;
let actualyInitPayload;

const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
type: RGQL_MSG_INIT,
payload: initPayload,
},
}),
Observable.of({
data: {
id: 1,
type: RGQL_MSG_START,
payload: {
query: `subscription { testInterval(interval: 50) }`,
},
},
}).delay(50),
Observable.of({
data: {
id: 1,
type: RGQL_MSG_STOP,
},
}).delay(124)]).concatAll();

const expected: RGQLPacketData[] = [{
type: RGQL_MSG_INIT_SUCCESS,
}];
for ( let i = 0 ; i < 2 ; i ++ ) {
expected.push({
id: 1,
type: RGQL_MSG_DATA,
payload: {
data: {
testInterval: i,
},
},
});
}

const reqMngr = new RequestsManager({
schema,
executor: graphqlRxjs,
}, input, {
onInit: (payload) => {
actualyInitPayload = payload;
return true;
},
onRequestStart: (requestId, requestParams) => {
expect(requestId).to.be.equals(1);
onStartCalled = true;
return requestParams;
},
onRequestStop: (requestId) => {
expect(requestId).to.be.equals(1);
onStopCalled = true;
},
onDisconnect: () => {
onDisconnectCalled = true;
},
});

return wrapToRx(reqMngr.responseObservable)
.map((v) => v.data)
.bufferCount(expected.length + 1)
.toPromise().then((res) => {
expect(res).to.deep.equal(expected);
expect(actualyInitPayload).to.be.equals(initPayload);
expect(onStartCalled).to.be.equals(true);
expect(onStopCalled).to.be.equals(true);
expect(onDisconnectCalled).to.be.equals(true);
});
});

it('able to subscribe to changes', () => {
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
Expand Down Expand Up @@ -381,15 +624,18 @@ describe('RequestsManager', () => {
});

it('runs formatParams if provided', () => {
const input = Observable.of(<RGQLPacket>{
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
data: {
id: 1,
type: RGQL_MSG_START,
payload: {
query: `query { testString }`,
},
},
});
}),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = [
{
Expand Down Expand Up @@ -421,8 +667,8 @@ describe('RequestsManager', () => {
.map((v) => v.data)
.bufferCount(expected.length + 1)
.toPromise().then((res) => {
expect(res).to.deep.equal(expected);
expect(requestParamsRun).to.be.equals(true);
expect(res).to.deep.equal(expected);
});
});

Expand All @@ -437,7 +683,10 @@ describe('RequestsManager', () => {
},
},
};
const input = Observable.of(inputPacket);
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of(inputPacket),
// add delay to allow promises to resolve.
Observable.empty().delay(100)]).concatAll();

const expected = [{
metadata: 'packet-info',
Expand Down

0 comments on commit 1db893d

Please sign in to comment.