-
-
Notifications
You must be signed in to change notification settings - Fork 180
/
websocket.ts
139 lines (130 loc) · 6.2 KB
/
websocket.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import {MessageStream} from '../../views/chat/messages/stream/messageStream';
import {MessageUtils} from '../../views/chat/messages/messageUtils';
import {CustomHandler, IWebsocketHandler} from './customHandler';
import {ErrorMessages} from '../errorMessages/errorMessages';
import {Messages} from '../../views/chat/messages/messages';
import {StreamSimulation} from '../../types/stream';
import {ServiceIO} from '../../services/serviceIO';
import {Response} from '../../types/response';
import {RequestUtils} from './requestUtils';
import {Demo} from '../demo/demo';
import {Stream} from './stream';
export type RoleToStream = {[role: string]: MessageStream};
export class Websocket {
public static setup(io: ServiceIO) {
if (io.connectSettings.url !== Demo.URL) {
io.permittedErrorPrefixes = ['Connection error', 'Error in server message'];
io.websocket = 'pending'; // main reason why not connecting here is because messages is not available yet
}
}
public static createConnection(io: ServiceIO, messages: Messages) {
if (!document.body.contains(io.deepChat)) return; // check if element is still present
const websocketConfig = io.connectSettings.websocket;
if (!websocketConfig) return;
if (io.connectSettings.handler) return CustomHandler.websocket(io, messages);
try {
const protocols = typeof websocketConfig !== 'boolean' ? websocketConfig : undefined;
// this will throw an error when url doesn't start with 'ws:'
const websocket = new WebSocket(io.connectSettings.url || '', protocols);
io.websocket = websocket;
io.websocket.onopen = () => {
messages.removeError();
if (io.websocket && typeof io.websocket === 'object') Websocket.assignListeners(io, websocket, messages);
io.deepChat._validationHandler?.();
};
io.websocket.onerror = (event) => {
console.error(event);
Websocket.retryConnection(io, messages);
};
} catch (error) {
console.error(error);
Websocket.retryConnection(io, messages);
}
}
private static retryConnection(io: ServiceIO, messages: Messages) {
io.deepChat._validationHandler?.();
if (!document.body.contains(io.deepChat)) return; // check if element is still present
io.websocket = 'pending';
if (!messages.isLastMessageError()) messages.addNewErrorMessage('service', 'Connection error');
setTimeout(() => {
Websocket.createConnection(io, messages);
}, 5000);
}
private static assignListeners(io: ServiceIO, ws: WebSocket, messages: Messages) {
const roleToStream = {} as RoleToStream;
ws.onmessage = async (message) => {
if (!io.extractResultData) return; // this return should theoretically not execute
try {
const result: Response = JSON.parse(message.data);
const finalResult = (await io.deepChat.responseInterceptor?.(result)) || result;
const resultData = await io.extractResultData(finalResult);
if (!resultData || typeof resultData !== 'object')
throw Error(ErrorMessages.INVALID_RESPONSE(result, 'server', !!io.deepChat.responseInterceptor, finalResult));
if (Stream.isSimulation(io.stream)) {
const upsertFunc = Websocket.stream.bind(this, io, messages, roleToStream);
const stream = roleToStream[result.role || MessageUtils.AI_ROLE];
Stream.upsertWFiles(messages, upsertFunc, stream, resultData);
} else {
messages.addNewMessage(resultData);
}
} catch (error) {
RequestUtils.displayError(messages, error as object, 'Error in server message');
}
};
ws.onclose = () => {
console.error('Connection closed');
// this is used to prevent two error messages displayed when websocket throws error and close events at the same time
if (!messages.isLastMessageError()) messages.addNewErrorMessage('service', 'Connection error');
if (io.stream) io.streamHandlers.abortStream.abort();
Websocket.createConnection(io, messages);
};
}
public static async sendWebsocket(io: ServiceIO, body: object, messages: Messages, stringifyBody = true) {
const ws = io.websocket;
if (!ws || ws === 'pending') return;
const requestDetails = {body, headers: io.connectSettings?.headers};
const {body: interceptedBody, error} = await RequestUtils.processRequestInterceptor(io.deepChat, requestDetails);
if (error) return messages.addNewErrorMessage('service', error);
if (!Websocket.isWebSocket(ws)) return ws.newUserMessage.listener(interceptedBody);
const processedBody = stringifyBody ? JSON.stringify(interceptedBody) : interceptedBody;
if (io.connectSettings?.url === Demo.URL) {
return Demo.request(io, messages);
}
if (ws.readyState === undefined || ws.readyState !== ws.OPEN) {
console.error('Connection is not open');
if (!messages.isLastMessageError()) messages.addNewErrorMessage('service', 'Connection error');
} else {
ws.send(JSON.stringify(processedBody));
io.completionsHandlers.onFinish();
}
}
public static canSendMessage(websocket: ServiceIO['websocket']) {
if (!websocket) return true;
if (websocket === 'pending') return false;
if (Websocket.isWebSocket(websocket)) {
return websocket.readyState !== undefined && websocket.readyState === websocket.OPEN;
}
return websocket.isOpen;
}
// if false then it is the internal websocket handler
private static isWebSocket(websocket: WebSocket | IWebsocketHandler): websocket is WebSocket {
return (websocket as WebSocket).send !== undefined;
}
public static stream(io: ServiceIO, messages: Messages, roleToStream: RoleToStream, result?: Response) {
if (!result) return;
const simulation = (io.stream as StreamSimulation).simulation;
if (typeof simulation === 'string') {
const role = result.role || MessageUtils.AI_ROLE;
const stream = roleToStream[role];
if (result.text === simulation || result.html === simulation) {
stream?.finaliseStreamedMessage();
delete roleToStream[role];
} else {
roleToStream[role] ??= new MessageStream(messages);
roleToStream[role].upsertStreamedMessage(result);
}
} else {
Stream.simulate(messages, io.streamHandlers, result);
}
}
}