Skip to content

Commit

Permalink
fix(ext/fetch): make EventSource more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
0f-0b committed Feb 27, 2024
1 parent f1a6912 commit aee159a
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 142 deletions.
2 changes: 1 addition & 1 deletion ext/fetch/26_fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async function mainFetch(req, recursive, terminator) {
try {
resp = await opFetchSend(requestRid);
} catch (err) {
if (terminator.aborted) return;
if (terminator.aborted) return abortedNetworkError();
throw err;
} finally {
if (cancelHandleRid !== null) {
Expand Down
285 changes: 144 additions & 141 deletions ext/fetch/27_eventsource.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const {
NumberIsNaN,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
Promise,
StringPrototypeEndsWith,
StringPrototypeIncludes,
StringPrototypeIndexOf,
Expand All @@ -32,6 +31,7 @@ import {
EventTarget,
setIsTrusted,
} from "ext:deno_web/02_event.js";
import { clearTimeout, setTimeout } from "ext:deno_web/02_timers.js";
import { TransformStream } from "ext:deno_web/06_streams.js";
import { TextDecoderStream } from "ext:deno_web/08_text_encoding.js";
import { getLocationHref } from "ext:deno_web/12_location.js";
Expand Down Expand Up @@ -106,11 +106,17 @@ const _reconnectionTime = Symbol("[[reconnectionTime]]");
const _lastEventID = Symbol("[[lastEventID]]");
const _abortController = Symbol("[[abortController]]");
const _loop = Symbol("[[loop]]");
const _reconnectionTimerId = Symbol("[[reconnectionTimerId]]");
const _reestablishConnection = Symbol("[[reestablishConnection]]");
const _failConnection = Symbol("[[failConnection]]");

class EventSource extends EventTarget {
/** @type {AbortController} */
[_abortController] = new AbortController();

/** @type {number | undefined} */
[_reconnectionTimerId];

/** @type {number} */
[_reconnectionTime] = 5000;

Expand Down Expand Up @@ -186,165 +192,162 @@ class EventSource extends EventTarget {
webidl.assertBranded(this, EventSourcePrototype);
this[_abortController].abort();
this[_readyState] = CLOSED;
clearTimeout(this[_reconnectionTimerId]);
}

async [_loop]() {
let lastEventIDValue = "";
while (this[_readyState] !== CLOSED) {
const lastEventIDValueCopy = lastEventIDValue;
lastEventIDValue = "";
const req = newInnerRequest(
"GET",
this[_url],
() =>
lastEventIDValueCopy === ""
? [
["accept", "text/event-stream"],
]
: [
["accept", "text/event-stream"],
[
"Last-Event-Id",
op_utf8_to_byte_string(lastEventIDValueCopy),
],
const req = newInnerRequest(
"GET",
this[_url],
() =>
this[_lastEventID] === ""
? [
["accept", "text/event-stream"],
]
: [
["accept", "text/event-stream"],
[
"Last-Event-Id",
op_utf8_to_byte_string(this[_lastEventID]),
],
null,
false,
);
/** @type {InnerResponse} */
const res = await mainFetch(req, true, this[_abortController].signal);

const contentType = ArrayPrototypeFind(
res.headerList,
(header) => StringPrototypeToLowerCase(header[0]) === "content-type",
);
if (res.type === "error") {
if (res.aborted) {
this[_readyState] = CLOSED;
this.dispatchEvent(new Event("error"));
break;
} else {
if (this[_readyState] === CLOSED) {
this[_abortController].abort();
break;
}
this[_readyState] = CONNECTING;
this.dispatchEvent(new Event("error"));
await new Promise((res) => setTimeout(res, this[_reconnectionTime]));
if (this[_readyState] !== CONNECTING) {
continue;
}
],
null,
false,
);
/** @type {InnerResponse} */
let res;
try {
res = await mainFetch(req, true, this[_abortController].signal);
} catch {
this[_reestablishConnection]();
return;
}

if (this[_lastEventID] !== "") {
lastEventIDValue = this[_lastEventID];
}
continue;
}
} else if (
res.status !== 200 ||
!StringPrototypeIncludes(
contentType?.[1].toLowerCase(),
"text/event-stream",
)
) {
this[_readyState] = CLOSED;
this.dispatchEvent(new Event("error"));
break;
}
if (res.aborted) {
this[_failConnection]();
return;
}
if (res.type === "error") {
this[_reestablishConnection]();
return;
}
const contentType = ArrayPrototypeFind(
res.headerList,
(header) => StringPrototypeToLowerCase(header[0]) === "content-type",
);
if (
res.status !== 200 ||
!StringPrototypeIncludes(
contentType ? StringPrototypeToLowerCase(contentType[1]) : "",
"text/event-stream",
)
) {
this[_failConnection]();
return;
}

if (this[_readyState] !== CLOSED) {
this[_readyState] = OPEN;
this.dispatchEvent(new Event("open"));
if (this[_readyState] === CLOSED) {
return;
}
this[_readyState] = OPEN;
this.dispatchEvent(new Event("open"));

let data = "";
let eventType = "";
let lastEventID = this[_lastEventID];
let data = "";
let eventType = "";
let lastEventID = this[_lastEventID];

for await (
// deno-lint-ignore prefer-primordials
const chunk of res.body.stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream({ allowCR: true }))
) {
if (chunk === "") {
this[_lastEventID] = lastEventID;
if (data === "") {
eventType = "";
continue;
}
if (StringPrototypeEndsWith(data, "\n")) {
data = StringPrototypeSlice(data, 0, -1);
}
const event = new MessageEvent(eventType || "message", {
data,
origin: res.url(),
lastEventId: this[_lastEventID],
});
setIsTrusted(event, true);
data = "";
try {
for await (
// deno-lint-ignore prefer-primordials
const chunk of res.body.stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream({ allowCR: true }))
) {
if (chunk === "") {
this[_lastEventID] = lastEventID;
if (data === "") {
eventType = "";
if (this[_readyState] !== CLOSED) {
this.dispatchEvent(event);
}
} else if (StringPrototypeStartsWith(chunk, ":")) {
continue;
} else {
let field = chunk;
let value = "";
if (StringPrototypeIncludes(chunk, ":")) {
({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":"));
if (StringPrototypeStartsWith(value, " ")) {
value = StringPrototypeSlice(value, 1);
}
}
if (StringPrototypeEndsWith(data, "\n")) {
data = StringPrototypeSlice(data, 0, -1);
}
const event = new MessageEvent(eventType || "message", {
data,
origin: res.url(),
lastEventId: this[_lastEventID],
});
setIsTrusted(event, true);
data = "";
eventType = "";
if (this[_readyState] !== CLOSED) {
this.dispatchEvent(event);
}
} else if (StringPrototypeStartsWith(chunk, ":")) {
continue;
} else {
let field = chunk;
let value = "";
if (StringPrototypeIncludes(chunk, ":")) {
({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":"));
if (StringPrototypeStartsWith(value, " ")) {
value = StringPrototypeSlice(value, 1);
}
}

switch (field) {
case "event": {
eventType = value;
break;
}
case "data": {
data += value + "\n";
break;
}
case "id": {
if (!StringPrototypeIncludes(value, "\0")) {
lastEventID = value;
}
break;
switch (field) {
case "event": {
eventType = value;
break;
}
case "data": {
data += value + "\n";
break;
}
case "id": {
if (!StringPrototypeIncludes(value, "\0")) {
lastEventID = value;
}
case "retry": {
const reconnectionTime = Number(value);
if (
!NumberIsNaN(reconnectionTime) &&
NumberIsFinite(reconnectionTime)
) {
this[_reconnectionTime] = reconnectionTime;
}
break;
break;
}
case "retry": {
const reconnectionTime = Number(value);
if (
!NumberIsNaN(reconnectionTime) &&
NumberIsFinite(reconnectionTime)
) {
this[_reconnectionTime] = reconnectionTime;
}
break;
}
}

if (this[_abortController].signal.aborted) {
break;
}
}
if (this[_readyState] === CLOSED) {
this[_abortController].abort();
break;
}
this[_readyState] = CONNECTING;
this.dispatchEvent(new Event("error"));
await new Promise((res) => setTimeout(res, this[_reconnectionTime]));
if (this[_readyState] !== CONNECTING) {
continue;
}
}
} catch {
// The connection is reestablished below
}

if (this[_lastEventID] !== "") {
lastEventIDValue = this[_lastEventID];
}
this[_reestablishConnection]();
}

[_reestablishConnection]() {
if (this[_readyState] === CLOSED) {
return;
}
this[_readyState] = CONNECTING;
this.dispatchEvent(new Event("error"));
this[_reconnectionTimerId] = setTimeout(() => {
if (this[_readyState] !== CONNECTING) {
return;
}
this[_loop]();
}, this[_reconnectionTime]);
}

[_failConnection]() {
if (this[_readyState] !== CLOSED) {
this[_readyState] = CLOSED;
this.dispatchEvent(new Event("error"));
}
}

Expand Down

0 comments on commit aee159a

Please sign in to comment.