diff --git a/client/1_client_encrypted.ts b/client/1_client_encrypted.ts index aaeab22f..d61c71b1 100644 --- a/client/1_client_encrypted.ts +++ b/client/1_client_encrypted.ts @@ -176,6 +176,12 @@ export class ClientEncrypted extends ClientAbstract { if (!this.transport) { unreachable(); } + + for (const [key, { reject }] of this.#promises.entries()) { + reject?.(new ConnectionError("Connection was closed")); + this.#promises.delete(key); + } + while (this.connected) { try { const buffer = await this.transport.transport.receive(); @@ -216,7 +222,7 @@ export class ClientEncrypted extends ClientAbstract { if (is("rpc_error", result)) { this.#LreceiveLoop.debug("RPCResult:", result.error_code, result.error_message); } else { - this.#LreceiveLoop.debug("RPCResult:", result._); + this.#LreceiveLoop.debug("RPCResult:", typeof result === "object" ? result._ : result); } const messageId = message.body.req_msg_id; const promise = this.#promises.get(messageId); diff --git a/client/2_file_manager.ts b/client/2_file_manager.ts index 932d7eb4..667b5865 100644 --- a/client/2_file_manager.ts +++ b/client/2_file_manager.ts @@ -18,13 +18,12 @@ * along with this program. If not, see . */ -import { extension, path, unreachable } from "../0_deps.ts"; +import { AssertionError, extension, path, unreachable } from "../0_deps.ts"; import { InputError } from "../0_errors.ts"; -import { drop, getLogger, getRandomId, iterateReadableStream, kilobyte, Logger, megabyte, minute, mod, Part, PartStream } from "../1_utilities.ts"; +import { drop, getLogger, getRandomId, iterateReadableStream, kilobyte, Logger, megabyte, minute, mod, Part, PartStream, second } from "../1_utilities.ts"; import { Api, as, is } from "../2_tl.ts"; import { constructSticker, deserializeFileId, FileId, FileSource, FileType, PhotoSourceType, serializeFileId, Sticker, toUniqueFileId } from "../3_types.ts"; import { STICKER_SET_NAME_TTL } from "../4_constants.ts"; -import { FloodWait } from "../4_errors.ts"; import { DownloadParams, UploadParams } from "./0_params.ts"; import { C, ConnectionPool } from "./1_types.ts"; @@ -94,8 +93,12 @@ export class FileManager { for await (part of iterateReadableStream(stream.pipeThrough(new PartStream(chunkSize)))) { promises.push( Promise.resolve().then(async () => { + let retryIn = 1; + let errorCount = 0; while (true) { try { + signal?.throwIfAborted(); + this.#Lupload.debug(`[${fileId}] uploading part ` + (part.part + 1)); if (part.small) { await invoke({ _: "upload.saveFilePart", file_id: fileId, bytes: part.bytes, file_part: part.part }); } else { @@ -106,7 +109,15 @@ export class FileManager { } catch (err) { signal?.throwIfAborted(); this.#Lupload.debug(`[${fileId}] failed to upload part ` + (part.part + 1)); - await this.#handleUploadError(err); + ++errorCount; + if (errorCount > 20) { + retryIn = 0; + } + await this.#handleError(err, retryIn, `[${fileId}-${part.part + 1}]`); + retryIn += 2; + if (retryIn > 11) { + retryIn = 11; + } } } }), @@ -128,6 +139,8 @@ export class FileManager { const isBig = buffer.byteLength > FileManager.#BIG_FILE_THRESHOLD; const partCount = Math.ceil(buffer.byteLength / chunkSize); let promises = new Array>(); + let started = false; + let delay = 0.05; main: for (let part = 0; part < partCount;) { for (let i = 0; i < pool.size; ++i) { const invoke = pool.invoke(); @@ -139,11 +152,20 @@ export class FileManager { break main; } const thisPart = part++; // `thisPart` must be used instead of `part` in the promise body + if (!started) { + started = true; + } else if (isBig) { + await new Promise((r) => setTimeout(r, delay)); + delay = Math.max(delay * .8, 0.003); + } promises.push( Promise.resolve().then(async () => { + let retryIn = 1; + let errorCount = 0; while (true) { try { signal?.throwIfAborted(); + this.#Lupload.debug(`[${fileId}] uploading part ` + (thisPart + 1)); if (isBig) { await invoke({ _: "upload.saveBigFilePart", file_id: fileId, file_part: thisPart, bytes, file_total_parts: partCount }); } else { @@ -153,8 +175,16 @@ export class FileManager { break; } catch (err) { signal?.throwIfAborted(); - this.#Lupload.debug(`[${fileId}] failed to upload part ` + (thisPart + 1) + " / " + partCount); - await this.#handleUploadError(err); + this.#Lupload.debug(`[${fileId}] failed to upload part ` + (thisPart + 1) + " / " + partCount, err); + ++errorCount; + if (errorCount > 20) { + retryIn = 0; + } + await this.#handleError(err, retryIn, `[${fileId}-${thisPart + 1}]`); + retryIn += 2; + if (retryIn > 11) { + retryIn = 11; + } } } }), @@ -168,10 +198,10 @@ export class FileManager { return { small: !isBig, parts: partCount }; } - async #handleUploadError(err: unknown) { - if (err instanceof FloodWait) { - this.#Lupload.warning("got a flood wait of " + err.seconds + " seconds"); - await new Promise((r) => setTimeout(r, err.seconds * 1000)); + async #handleError(err: unknown, retryIn: number, logPrefix: string) { + if (retryIn > 0) { + this.#Lupload.warning(`${logPrefix} retrying in ${retryIn} seconds`); + await new Promise((r) => setTimeout(r, retryIn * second)); } else { throw err; } @@ -280,24 +310,41 @@ export class FileManager { try { while (true) { - const file = await connection.invoke({ _: "upload.getFile", location, offset, limit }); + let retryIn = 1; + let errorCount = 0; + try { + const file = await connection.invoke({ _: "upload.getFile", location, offset, limit }); - if (is("upload.file", file)) { - yield file.bytes; - if (id != null) { - await this.#c.storage.saveFilePart(id, part, file.bytes); - } - ++part; - if (file.bytes.length < limit) { + if (is("upload.file", file)) { + yield file.bytes; if (id != null) { - await this.#c.storage.setFilePartCount(id, part + 1, chunkSize); + await this.#c.storage.saveFilePart(id, part, file.bytes); + } + ++part; + if (file.bytes.length < limit) { + if (id != null) { + await this.#c.storage.setFilePartCount(id, part + 1, chunkSize); + } + break; + } else { + offset += BigInt(file.bytes.length); } - break; } else { - offset += BigInt(file.bytes.length); + unreachable(); + } + } catch (err) { + if (typeof err === "object" && err instanceof AssertionError) { + throw err; + } + ++errorCount; + if (errorCount > 20) { + retryIn = 0; + } + await this.#handleError(err, retryIn, `[${id}-${part + 1}]`); + retryIn += 2; + if (retryIn > 11) { + retryIn = 11; } - } else { - unreachable(); } } } finally { diff --git a/client/2_update_manager.ts b/client/2_update_manager.ts index d8d6ea83..ab075723 100644 --- a/client/2_update_manager.ts +++ b/client/2_update_manager.ts @@ -134,6 +134,9 @@ export class UpdateManager { } async fetchState(source: string) { + if (this.#c.cdn) { + return; + } let state = await this.#c.invoke({ _: "updates.getState" }); const difference = await this.#c.invoke({ ...state, _: "updates.getDifference" }); if (is("updates.difference", difference)) { @@ -436,6 +439,9 @@ export class UpdateManager { #processUpdatesQueue = new Queue("UpdateManager/processUpdates"); processUpdates(updates: Api.Update | Api.Updates, checkGap: boolean, call: Api.AnyObject | null = null, callback?: () => void) { + if (this.#c.cdn) { + return; + } this.#processUpdatesQueue.add(() => this.#processUpdates(updates, checkGap, call).then(callback)); } @@ -629,6 +635,9 @@ export class UpdateManager { return localState; } async recoverUpdateGap(source: string) { + if (this.#c.cdn) { + return; + } this.#LrecoverUpdateGap.debug(`recovering from update gap [${source}]`); this.#c.setConnectionState("updating"); @@ -793,6 +802,9 @@ export class UpdateManager { } setUpdateHandler(handler: UpdateHandler) { + if (this.#c.cdn) { + return; + } this.#updateHandler = handler; } } diff --git a/client/5_client.ts b/client/5_client.ts index 955c20fa..7a85b828 100644 --- a/client/5_client.ts +++ b/client/5_client.ts @@ -293,6 +293,7 @@ export class Client extends Composer { readonly #ignoreOutgoing: boolean | null; #persistCache: boolean; + #cdn: boolean; #LsignIn: Logger; #LpingLoop: Logger; #LhandleMigrationError: Logger; @@ -366,6 +367,7 @@ export class Client extends Composer { this.#LpingLoop = L.branch("pingLoop"); this.#LhandleMigrationError = L.branch("[handleMigrationError]"); this.#L$initConncetion = L.branch("#initConnection"); + this.#cdn = params?.cdn ?? false; const c = { id, @@ -390,7 +392,7 @@ export class Client extends Composer { parseMode: this.#parseMode, getCdnConnection: this.#getCdnConnection.bind(this), getCdnConnectionPool: this.#getCdnConnectionPool.bind(this), - cdn: params?.cdn ?? false, + cdn: this.#cdn, ignoreOutgoing: this.#ignoreOutgoing, dropPendingUpdates: params?.dropPendingUpdates, }; @@ -417,14 +419,22 @@ export class Client extends Composer { }; if (params?.defaultHandlers ?? true) { + let reconnecting = false; let lastReconnection: Date | null = null; this.on("connectionState", ({ connectionState }, next) => { + if (connectionState != "notConnected") { + return; + } + if (this.disconnected) { + L.debug("not reconnecting"); + return; + } + if (reconnecting) { + return; + } + reconnecting = true; drop((async () => { - if (connectionState == "notConnected") { - if (this.disconnected) { - L.debug("not reconnecting"); - return; - } + try { let delay = 5; if (lastReconnection != null && Date.now() - lastReconnection.getTime() <= 10 * second) { await new Promise((r) => setTimeout(r, delay * second)); @@ -445,6 +455,8 @@ export class Client extends Composer { } await new Promise((r) => setTimeout(r, delay * second)); } + } finally { + reconnecting = false; } })()); return next(); @@ -537,7 +549,7 @@ export class Client extends Composer { }); return { - invoke: client.invoke, + invoke: client.invoke.bind(client), connect: async () => { await client.connect(); @@ -904,7 +916,7 @@ export class Client extends Composer { #lastPropagatedConnectionState: ConnectionState | null = null; #stateChangeHandler: (connected: boolean) => void = ((connected: boolean) => { const connectionState = connected ? "ready" : "notConnected"; - if (this.connected == connected && this.#lastPropagatedConnectionState != connectionState) { + if (this.#lastPropagatedConnectionState != connectionState) { this.#propagateConnectionState(connectionState); } }).bind(this); @@ -1196,6 +1208,9 @@ export class Client extends Composer { drop(this.#pingLoop()); } async #pingLoop() { + if (this.#cdn) { + return; + } this.#pingLoopAbortController = new AbortController(); while (this.connected) { try { @@ -1230,6 +1245,7 @@ export class Client extends Composer { while (true) { try { if (!this.#connectionInited && !isMtprotoFunction(function_)) { + this.#connectionInited = true; const result = await this.#client.invoke({ _: "initConnection", api_id: await this.#getApiId(), @@ -1245,7 +1261,6 @@ export class Client extends Composer { system_lang_code: this.systemLangCode, system_version: this.systemVersion, }, noWait); - this.#connectionInited = true; this.#L$initConncetion.debug("connection inited"); return result as R | void; } else { diff --git a/connection/1_connection_tcp.ts b/connection/1_connection_tcp.ts index 486a6416..35399a94 100644 --- a/connection/1_connection_tcp.ts +++ b/connection/1_connection_tcp.ts @@ -80,12 +80,10 @@ export class ConnectionTCP implements Connection { this.#canRead = false; break; } catch (err) { - if (!this.connected) { - this.stateChangeHandler?.(false); - this.#rejectRead(); - } else { - L.error(err); - } + this.#canRead = false; + this.stateChangeHandler?.(false); + this.#rejectRead(); + L.error(err); } } while (this.connected); this.stateChangeHandler?.(false);