forked from apollographql/apollo-server
/
index.ts
109 lines (99 loc) · 3.05 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import {
IObservable,
Observer,
Observable,
GraphQLOptions,
resolveGraphqlOptions,
ReactiveGraphQLOptions,
ReactiveRequest,
RGQLPacket,
RequestsManager,
} from 'graphql-server-reactive-core';
export { ReactiveGraphQLOptions };
import * as Websocket from 'ws';
export interface WSRequest extends ReactiveRequest {
flags: {
binary: boolean;
};
}
export interface WSGraphQLOptionsFunction {
(ws: Websocket): ReactiveGraphQLOptions | Promise<ReactiveGraphQLOptions>;
}
export interface WSHandler {
(ws: Websocket): void;
}
function ObservableFromWs(ws: Websocket, graphqlOptions: ReactiveGraphQLOptions): IObservable<WSRequest> {
return new Observable<WSRequest>((observer) => {
let nextListener = (data: any, flags: {binary: boolean}) => {
let request: WSRequest;
try {
try {
request = {
packet: JSON.parse(data) as RGQLPacket,
graphqlOptions,
flags: flags,
};
} catch (e) {
throw new Error('Message must be JSON-parseable.');
}
} catch (e) {
observer.error(e);
}
observer.next(request);
};
let errorListener = (e: Error) => {
observer.error(e);
};
let completeListener = () => {
observer.complete();
};
ws.on('message', nextListener);
ws.on('error', errorListener);
ws.on('close', completeListener);
return () => {
ws.removeListener('close', completeListener);
ws.removeListener('error', errorListener);
ws.removeListener('message', nextListener);
ws.close();
};
});
}
export function graphqlWs(options: ReactiveGraphQLOptions | WSGraphQLOptionsFunction): WSHandler {
if (!options) {
throw new Error('Apollo Server requires options.');
}
if (arguments.length > 1) {
throw new Error(`Apollo Server expects exactly one argument, got ${arguments.length}`);
}
return (ws): void => {
// XXX graphlWs should be called as event emitter callback,
// they do not support promises.
resolveGraphqlOptions(options, ws)
.then((graphqlOptions: ReactiveGraphQLOptions) => {
const requests = new RequestsManager();
const subscription = ObservableFromWs(ws, graphqlOptions).subscribe({
next: (request) => {
requests.handleRequest(request, {
next: (data) => ws.send(JSON.stringify(data)),
error: (e) => ws.close(1008, e.message),
complete: () => {/* noop */},
});
},
error: (e) => {
// RFC 6455 - 7.4.1 ==> 1008 indicates that an endpoint is terminating the connection
// because it has received a message that violates its policy.
return ws.close(1008, e.message);
},
complete: () => {
requests.unsubscribeAll();
ws.terminate();
},
});
})
.catch((e) => {
// RFC 6455 - 7.4.1 ==> 1002 indicates that an endpoint is terminating the connection due
// to a protocol error.
ws.close(1002, e.message);
});
};
}