diff --git a/client/1_client_encrypted.ts b/client/1_client_encrypted.ts
index aaeab22f..1d7dddd2 100644
--- a/client/1_client_encrypted.ts
+++ b/client/1_client_encrypted.ts
@@ -176,6 +176,12 @@ export class ClientEncrypted extends ClientAbstract {
if (!this.transport) {
unreachable();
}
+
+ for (const [key, { reject }] of this.#promises.entries()) {
+ reject?.(new ConnectionError("Connection was closed"));
+ this.#promises.delete(key);
+ }
+
while (this.connected) {
try {
const buffer = await this.transport.transport.receive();
@@ -216,7 +222,7 @@ export class ClientEncrypted extends ClientAbstract {
if (is("rpc_error", result)) {
this.#LreceiveLoop.debug("RPCResult:", result.error_code, result.error_message);
} else {
- this.#LreceiveLoop.debug("RPCResult:", result._);
+ this.#LreceiveLoop.debug("RPCResult:", Array.isArray(result) ? "Array" : typeof result === "object" ? result._ : result);
}
const messageId = message.body.req_msg_id;
const promise = this.#promises.get(messageId);
diff --git a/client/2_file_manager.ts b/client/2_file_manager.ts
index 932d7eb4..667b5865 100644
--- a/client/2_file_manager.ts
+++ b/client/2_file_manager.ts
@@ -18,13 +18,12 @@
* along with this program. If not, see .
*/
-import { extension, path, unreachable } from "../0_deps.ts";
+import { AssertionError, extension, path, unreachable } from "../0_deps.ts";
import { InputError } from "../0_errors.ts";
-import { drop, getLogger, getRandomId, iterateReadableStream, kilobyte, Logger, megabyte, minute, mod, Part, PartStream } from "../1_utilities.ts";
+import { drop, getLogger, getRandomId, iterateReadableStream, kilobyte, Logger, megabyte, minute, mod, Part, PartStream, second } from "../1_utilities.ts";
import { Api, as, is } from "../2_tl.ts";
import { constructSticker, deserializeFileId, FileId, FileSource, FileType, PhotoSourceType, serializeFileId, Sticker, toUniqueFileId } from "../3_types.ts";
import { STICKER_SET_NAME_TTL } from "../4_constants.ts";
-import { FloodWait } from "../4_errors.ts";
import { DownloadParams, UploadParams } from "./0_params.ts";
import { C, ConnectionPool } from "./1_types.ts";
@@ -94,8 +93,12 @@ export class FileManager {
for await (part of iterateReadableStream(stream.pipeThrough(new PartStream(chunkSize)))) {
promises.push(
Promise.resolve().then(async () => {
+ let retryIn = 1;
+ let errorCount = 0;
while (true) {
try {
+ signal?.throwIfAborted();
+ this.#Lupload.debug(`[${fileId}] uploading part ` + (part.part + 1));
if (part.small) {
await invoke({ _: "upload.saveFilePart", file_id: fileId, bytes: part.bytes, file_part: part.part });
} else {
@@ -106,7 +109,15 @@ export class FileManager {
} catch (err) {
signal?.throwIfAborted();
this.#Lupload.debug(`[${fileId}] failed to upload part ` + (part.part + 1));
- await this.#handleUploadError(err);
+ ++errorCount;
+ if (errorCount > 20) {
+ retryIn = 0;
+ }
+ await this.#handleError(err, retryIn, `[${fileId}-${part.part + 1}]`);
+ retryIn += 2;
+ if (retryIn > 11) {
+ retryIn = 11;
+ }
}
}
}),
@@ -128,6 +139,8 @@ export class FileManager {
const isBig = buffer.byteLength > FileManager.#BIG_FILE_THRESHOLD;
const partCount = Math.ceil(buffer.byteLength / chunkSize);
let promises = new Array>();
+ let started = false;
+ let delay = 0.05;
main: for (let part = 0; part < partCount;) {
for (let i = 0; i < pool.size; ++i) {
const invoke = pool.invoke();
@@ -139,11 +152,20 @@ export class FileManager {
break main;
}
const thisPart = part++; // `thisPart` must be used instead of `part` in the promise body
+ if (!started) {
+ started = true;
+ } else if (isBig) {
+ await new Promise((r) => setTimeout(r, delay));
+ delay = Math.max(delay * .8, 0.003);
+ }
promises.push(
Promise.resolve().then(async () => {
+ let retryIn = 1;
+ let errorCount = 0;
while (true) {
try {
signal?.throwIfAborted();
+ this.#Lupload.debug(`[${fileId}] uploading part ` + (thisPart + 1));
if (isBig) {
await invoke({ _: "upload.saveBigFilePart", file_id: fileId, file_part: thisPart, bytes, file_total_parts: partCount });
} else {
@@ -153,8 +175,16 @@ export class FileManager {
break;
} catch (err) {
signal?.throwIfAborted();
- this.#Lupload.debug(`[${fileId}] failed to upload part ` + (thisPart + 1) + " / " + partCount);
- await this.#handleUploadError(err);
+ this.#Lupload.debug(`[${fileId}] failed to upload part ` + (thisPart + 1) + " / " + partCount, err);
+ ++errorCount;
+ if (errorCount > 20) {
+ retryIn = 0;
+ }
+ await this.#handleError(err, retryIn, `[${fileId}-${thisPart + 1}]`);
+ retryIn += 2;
+ if (retryIn > 11) {
+ retryIn = 11;
+ }
}
}
}),
@@ -168,10 +198,10 @@ export class FileManager {
return { small: !isBig, parts: partCount };
}
- async #handleUploadError(err: unknown) {
- if (err instanceof FloodWait) {
- this.#Lupload.warning("got a flood wait of " + err.seconds + " seconds");
- await new Promise((r) => setTimeout(r, err.seconds * 1000));
+ async #handleError(err: unknown, retryIn: number, logPrefix: string) {
+ if (retryIn > 0) {
+ this.#Lupload.warning(`${logPrefix} retrying in ${retryIn} seconds`);
+ await new Promise((r) => setTimeout(r, retryIn * second));
} else {
throw err;
}
@@ -280,24 +310,41 @@ export class FileManager {
try {
while (true) {
- const file = await connection.invoke({ _: "upload.getFile", location, offset, limit });
+ let retryIn = 1;
+ let errorCount = 0;
+ try {
+ const file = await connection.invoke({ _: "upload.getFile", location, offset, limit });
- if (is("upload.file", file)) {
- yield file.bytes;
- if (id != null) {
- await this.#c.storage.saveFilePart(id, part, file.bytes);
- }
- ++part;
- if (file.bytes.length < limit) {
+ if (is("upload.file", file)) {
+ yield file.bytes;
if (id != null) {
- await this.#c.storage.setFilePartCount(id, part + 1, chunkSize);
+ await this.#c.storage.saveFilePart(id, part, file.bytes);
+ }
+ ++part;
+ if (file.bytes.length < limit) {
+ if (id != null) {
+ await this.#c.storage.setFilePartCount(id, part + 1, chunkSize);
+ }
+ break;
+ } else {
+ offset += BigInt(file.bytes.length);
}
- break;
} else {
- offset += BigInt(file.bytes.length);
+ unreachable();
+ }
+ } catch (err) {
+ if (typeof err === "object" && err instanceof AssertionError) {
+ throw err;
+ }
+ ++errorCount;
+ if (errorCount > 20) {
+ retryIn = 0;
+ }
+ await this.#handleError(err, retryIn, `[${id}-${part + 1}]`);
+ retryIn += 2;
+ if (retryIn > 11) {
+ retryIn = 11;
}
- } else {
- unreachable();
}
}
} finally {
diff --git a/client/2_update_manager.ts b/client/2_update_manager.ts
index d8d6ea83..ab075723 100644
--- a/client/2_update_manager.ts
+++ b/client/2_update_manager.ts
@@ -134,6 +134,9 @@ export class UpdateManager {
}
async fetchState(source: string) {
+ if (this.#c.cdn) {
+ return;
+ }
let state = await this.#c.invoke({ _: "updates.getState" });
const difference = await this.#c.invoke({ ...state, _: "updates.getDifference" });
if (is("updates.difference", difference)) {
@@ -436,6 +439,9 @@ export class UpdateManager {
#processUpdatesQueue = new Queue("UpdateManager/processUpdates");
processUpdates(updates: Api.Update | Api.Updates, checkGap: boolean, call: Api.AnyObject | null = null, callback?: () => void) {
+ if (this.#c.cdn) {
+ return;
+ }
this.#processUpdatesQueue.add(() => this.#processUpdates(updates, checkGap, call).then(callback));
}
@@ -629,6 +635,9 @@ export class UpdateManager {
return localState;
}
async recoverUpdateGap(source: string) {
+ if (this.#c.cdn) {
+ return;
+ }
this.#LrecoverUpdateGap.debug(`recovering from update gap [${source}]`);
this.#c.setConnectionState("updating");
@@ -793,6 +802,9 @@ export class UpdateManager {
}
setUpdateHandler(handler: UpdateHandler) {
+ if (this.#c.cdn) {
+ return;
+ }
this.#updateHandler = handler;
}
}
diff --git a/client/5_client.ts b/client/5_client.ts
index 955c20fa..7a85b828 100644
--- a/client/5_client.ts
+++ b/client/5_client.ts
@@ -293,6 +293,7 @@ export class Client extends Composer {
readonly #ignoreOutgoing: boolean | null;
#persistCache: boolean;
+ #cdn: boolean;
#LsignIn: Logger;
#LpingLoop: Logger;
#LhandleMigrationError: Logger;
@@ -366,6 +367,7 @@ export class Client extends Composer {
this.#LpingLoop = L.branch("pingLoop");
this.#LhandleMigrationError = L.branch("[handleMigrationError]");
this.#L$initConncetion = L.branch("#initConnection");
+ this.#cdn = params?.cdn ?? false;
const c = {
id,
@@ -390,7 +392,7 @@ export class Client extends Composer {
parseMode: this.#parseMode,
getCdnConnection: this.#getCdnConnection.bind(this),
getCdnConnectionPool: this.#getCdnConnectionPool.bind(this),
- cdn: params?.cdn ?? false,
+ cdn: this.#cdn,
ignoreOutgoing: this.#ignoreOutgoing,
dropPendingUpdates: params?.dropPendingUpdates,
};
@@ -417,14 +419,22 @@ export class Client extends Composer {
};
if (params?.defaultHandlers ?? true) {
+ let reconnecting = false;
let lastReconnection: Date | null = null;
this.on("connectionState", ({ connectionState }, next) => {
+ if (connectionState != "notConnected") {
+ return;
+ }
+ if (this.disconnected) {
+ L.debug("not reconnecting");
+ return;
+ }
+ if (reconnecting) {
+ return;
+ }
+ reconnecting = true;
drop((async () => {
- if (connectionState == "notConnected") {
- if (this.disconnected) {
- L.debug("not reconnecting");
- return;
- }
+ try {
let delay = 5;
if (lastReconnection != null && Date.now() - lastReconnection.getTime() <= 10 * second) {
await new Promise((r) => setTimeout(r, delay * second));
@@ -445,6 +455,8 @@ export class Client extends Composer {
}
await new Promise((r) => setTimeout(r, delay * second));
}
+ } finally {
+ reconnecting = false;
}
})());
return next();
@@ -537,7 +549,7 @@ export class Client extends Composer {
});
return {
- invoke: client.invoke,
+ invoke: client.invoke.bind(client),
connect: async () => {
await client.connect();
@@ -904,7 +916,7 @@ export class Client extends Composer {
#lastPropagatedConnectionState: ConnectionState | null = null;
#stateChangeHandler: (connected: boolean) => void = ((connected: boolean) => {
const connectionState = connected ? "ready" : "notConnected";
- if (this.connected == connected && this.#lastPropagatedConnectionState != connectionState) {
+ if (this.#lastPropagatedConnectionState != connectionState) {
this.#propagateConnectionState(connectionState);
}
}).bind(this);
@@ -1196,6 +1208,9 @@ export class Client extends Composer {
drop(this.#pingLoop());
}
async #pingLoop() {
+ if (this.#cdn) {
+ return;
+ }
this.#pingLoopAbortController = new AbortController();
while (this.connected) {
try {
@@ -1230,6 +1245,7 @@ export class Client extends Composer {
while (true) {
try {
if (!this.#connectionInited && !isMtprotoFunction(function_)) {
+ this.#connectionInited = true;
const result = await this.#client.invoke({
_: "initConnection",
api_id: await this.#getApiId(),
@@ -1245,7 +1261,6 @@ export class Client extends Composer {
system_lang_code: this.systemLangCode,
system_version: this.systemVersion,
}, noWait);
- this.#connectionInited = true;
this.#L$initConncetion.debug("connection inited");
return result as R | void;
} else {
diff --git a/connection/1_connection_tcp.ts b/connection/1_connection_tcp.ts
index 486a6416..35399a94 100644
--- a/connection/1_connection_tcp.ts
+++ b/connection/1_connection_tcp.ts
@@ -80,12 +80,10 @@ export class ConnectionTCP implements Connection {
this.#canRead = false;
break;
} catch (err) {
- if (!this.connected) {
- this.stateChangeHandler?.(false);
- this.#rejectRead();
- } else {
- L.error(err);
- }
+ this.#canRead = false;
+ this.stateChangeHandler?.(false);
+ this.#rejectRead();
+ L.error(err);
}
} while (this.connected);
this.stateChangeHandler?.(false);