Skip to content

Commit

Permalink
use requestId to eliminate inconsistencies
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkav committed Aug 25, 2022
1 parent b544853 commit 98335a9
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions packages/insomnia/src/main/network/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ const timelineFileStreams = new Map<string, fs.WriteStream>();
// CTS flag; When set, the renderer thread is accepting new WebSocket events.
let clearToSend = true;

// Send queue map; holds batches of events for each event channel, to be sent upon receiving a CTS signal.
// Send queue map; holds batches of events for each event requestId, to be sent upon receiving a CTS signal.
const sendQueueMap = new Map<string, WebSocketEventLog>();

/**
* Dispatches a websocket event to a renderer, using batching control flow logic.
* When CTS is set, the events are sent immediately.
* If CTS is cleared, the events are batched into the send queue.
*/
function dispatchWebSocketEvent(target: Electron.WebContents, eventChannel: string, wsEvent: WebSocketEvent): void {
function dispatchWebSocketEvent(requestId:string, target: Electron.WebContents, eventChannel: string, wsEvent: WebSocketEvent): void {
// If the CTS flag is already set, just send immediately.
if (clearToSend) {
target.send(eventChannel, [wsEvent]);
Expand All @@ -85,11 +85,11 @@ function dispatchWebSocketEvent(target: Electron.WebContents, eventChannel: stri
}

// Otherwise, append to send queue for this event channel.
const sendQueue = sendQueueMap.get(eventChannel);
const sendQueue = sendQueueMap.get(requestId);
if (sendQueue) {
sendQueue.push(wsEvent);
} else {
sendQueueMap.set(eventChannel, [wsEvent]);
sendQueueMap.set(requestId, [wsEvent]);
}
}

Expand Down Expand Up @@ -179,7 +179,7 @@ async function createWebSocketConnection(

eventLogFileStreams.get(options.requestId)?.write(JSON.stringify(openEvent) + '\n');
timelineFileStreams.get(options.requestId)?.write(JSON.stringify({ value: 'WebSocket connection established', name: 'Text', timestamp: Date.now() }) + '\n');
dispatchWebSocketEvent(event.sender, eventChannel, openEvent);
dispatchWebSocketEvent(options.requestId, event.sender, eventChannel, openEvent);
event.sender.send(readyStateChannel, ws.readyState);
});

Expand All @@ -194,7 +194,7 @@ async function createWebSocketConnection(
};

eventLogFileStreams.get(options.requestId)?.write(JSON.stringify(messageEvent) + '\n');
dispatchWebSocketEvent(event.sender, eventChannel, messageEvent);
dispatchWebSocketEvent(options.requestId, event.sender, eventChannel, messageEvent);
});

ws.addEventListener('close', ({ code, reason, wasClean }) => {
Expand All @@ -208,11 +208,10 @@ async function createWebSocketConnection(
timestamp: Date.now(),
};

sendQueueMap.delete(eventChannel);
const message = `Closing connection with code ${code}`;
closeRequest(request._id, message, closeEvent);

dispatchWebSocketEvent(event.sender, eventChannel, closeEvent);
dispatchWebSocketEvent(options.requestId, event.sender, eventChannel, closeEvent);
event.sender.send(readyStateChannel, ws.readyState);
});

Expand All @@ -230,7 +229,7 @@ async function createWebSocketConnection(

closeRequest(request._id, message, errorEvent);

dispatchWebSocketEvent(event.sender, eventChannel, errorEvent);
dispatchWebSocketEvent(options.requestId, event.sender, eventChannel, errorEvent);
event.sender.send(readyStateChannel, ws.readyState);

const responsePatch = {
Expand Down Expand Up @@ -272,6 +271,7 @@ async function closeRequest(requestId: string, message: string, event?: WebSocke
timelineFileStreams.get(requestId)?.write(JSON.stringify({ value: message, name: 'Text', timestamp: Date.now() }) + '\n');
timelineFileStreams.get(requestId)?.end();
timelineFileStreams.delete(requestId);
sendQueueMap.delete(requestId);
WebSocketConnections.delete(requestId);
}

Expand Down Expand Up @@ -321,7 +321,7 @@ async function sendWebSocketEvent(
return;
}
const eventChannel = `webSocketRequest.connection.${response._id}.event`;
dispatchWebSocketEvent(event.sender, eventChannel, lastMessage);
dispatchWebSocketEvent(options.requestId, event.sender, eventChannel, lastMessage);
}

async function closeWebSocketConnection(
Expand Down Expand Up @@ -352,22 +352,22 @@ async function findMany(
* Sets the CTS flag; sent when the UI is ready for more events.
*/
function signalClearToSend(event: Electron.IpcMainInvokeEvent) {
const nextChannel = sendQueueMap.keys().next();
const nextRequestId = sendQueueMap.keys().next();

// There are no pending events; just set the CTS flag.
if (nextChannel.done) {
if (nextRequestId.done) {
clearToSend = true;
return;
}

// We have batched events; immediately send one batch.
const sendQueue = sendQueueMap.get(nextChannel.value);
const sendQueue = sendQueueMap.get(nextRequestId.value);
if (!sendQueue) {
return;
}

event.sender.send(nextChannel.value, sendQueue);
sendQueueMap.delete(nextChannel.value);
event.sender.send(nextRequestId.value, sendQueue);
sendQueueMap.delete(nextRequestId.value);
}

export function registerWebSocketHandlers() {
Expand Down

0 comments on commit 98335a9

Please sign in to comment.