From 96fef83dc72004229ace39ab659d43a1d526b420 Mon Sep 17 00:00:00 2001 From: Roj Date: Wed, 5 Jun 2024 17:34:47 +0300 Subject: [PATCH] Connection stability improvements --- client/0_client_abstract.ts | 10 +- client/2_update_manager.ts | 2 + client/5_client.ts | 159 +++++++++++++++++------------- connection/1_connection_tcp.ts | 4 +- transport/1_transport_abridged.ts | 5 +- 5 files changed, 104 insertions(+), 76 deletions(-) diff --git a/client/0_client_abstract.ts b/client/0_client_abstract.ts index a1657348..cafe39cd 100644 --- a/client/0_client_abstract.ts +++ b/client/0_client_abstract.ts @@ -82,25 +82,29 @@ export abstract class ClientAbstract { await initTgCrypto(); await this.transport.connection.open(); await this.transport.transport.initialize(); + this.#disconnected = false; } async reconnect(dc?: DC) { - await this.disconnect(); + await this.transport?.transport.deinitialize(); + await this.transport?.connection.close(); if (dc) { - await this.setDc(dc); + this.setDc(dc); } await this.connect(); } + #disconnected = false; async disconnect() { if (!this.transport) { throw new ConnectionError("Not connected."); } await this.transport.transport.deinitialize(); await this.transport.connection.close(); + this.#disconnected = true; } get disconnected(): boolean { - return !this.transport?.transport.initialized; + return this.#disconnected; } } diff --git a/client/2_update_manager.ts b/client/2_update_manager.ts index 91c50e72..f5fc5cb5 100644 --- a/client/2_update_manager.ts +++ b/client/2_update_manager.ts @@ -692,6 +692,8 @@ export class UpdateManager { unreachable(); } } + } catch (err) { + this.#LrecoverUpdateGap.error(err); } finally { this.#c.resetConnectionState(); } diff --git a/client/5_client.ts b/client/5_client.ts index 017f0bbe..d7748461 100644 --- a/client/5_client.ts +++ b/client/5_client.ts @@ -19,8 +19,8 @@ */ import { unreachable } from "../0_deps.ts"; -import { AccessError, InputError } from "../0_errors.ts"; -import { cleanObject, drop, getLogger, getRandomId, Logger, MaybePromise, minute, mustPrompt, mustPromptOneOf, second, ZERO_CHANNEL_ID } from "../1_utilities.ts"; +import { AccessError, ConnectionError, InputError } from "../0_errors.ts"; +import { cleanObject, drop, getLogger, getRandomId, Logger, MaybePromise, minute, mustPrompt, mustPromptOneOf, Mutex, second, ZERO_CHANNEL_ID } from "../1_utilities.ts"; import { Api, as, chatIdToPeerId, getChatIdPeerType, is, peerToChatId } from "../2_tl.ts"; import { Storage, StorageMemory } from "../2_storage.ts"; import { DC } from "../3_transport.ts"; @@ -324,12 +324,7 @@ export class Client extends Composer { await this.#updateManager.recoverUpdateGap(source); break; case "decryption": - try { - await this.disconnect(); - } catch { - // - } - await this.connect(); + await this.reconnect(); await this.#updateManager.recoverUpdateGap(source); break; } @@ -418,50 +413,63 @@ export class Client extends Composer { return transport; }; - if (params?.defaultHandlers ?? true) { - let reconnecting = false; - let lastReconnection: Date | null = null; - this.on("connectionState", ({ connectionState }, next) => { - if (connectionState != "notConnected") { - return; - } - if (this.disconnected) { - L.debug("not reconnecting"); - return; - } - if (reconnecting) { - return; - } - reconnecting = true; - drop((async () => { + this.invoke.use(async ({ error }, next) => { + if (error instanceof ConnectionError) { + while (!this.connected) { + if (this.disconnected) { + return next(); + } try { - let delay = 5; - if (lastReconnection != null && Date.now() - lastReconnection.getTime() <= 10 * second) { - await new Promise((r) => setTimeout(r, delay * second)); - } - while (!this.connected) { - L.debug("reconnecting"); - try { - await this.connect(); - lastReconnection = new Date(); - L.debug("reconnected"); - drop(this.#updateManager.recoverUpdateGap("reconnect")); - break; - } catch (err) { - if (delay < 15) { - delay += 5; - } - L.debug(`failed to reconnect, retrying in ${delay}:`, err); + await this.connect(); + } catch { + // + } + } + return true; + } else { + return next(); + } + }); + + let reconnecting = false; + this.on("connectionState", ({ connectionState }, next) => { + if (connectionState != "notConnected") { + return next(); + } + if (this.disconnected) { + L.debug("not reconnecting"); + return next(); + } + if (reconnecting) { + return next(); + } + reconnecting = true; + drop((async () => { + try { + let delay = 5; + while (!this.connected) { + L.debug("reconnecting"); + try { + await this.connect(); + L.debug("reconnected"); + drop(this.#updateManager.recoverUpdateGap("reconnect")); + break; + } catch (err) { + if (delay < 15) { + delay += 5; } - await new Promise((r) => setTimeout(r, delay * second)); + L.debug(`failed to reconnect, retrying in ${delay}:`, err); } - } finally { - reconnecting = false; + await new Promise((r) => setTimeout(r, delay * second)); } - })()); - return next(); - }); + } finally { + reconnecting = false; + } + })()); + return next(); + }); + if (params?.defaultHandlers ?? true) { this.invoke.use(async ({ error }, next) => { if (error instanceof FloodWait && error.seconds <= 10) { L.warning("sleeping for", error.seconds, "because of:", error); @@ -948,43 +956,56 @@ export class Client extends Composer { } } + #connectMutex = new Mutex(); + #lastConnect: Date | null = null; /** * Loads the session if `setDc` was not called, initializes and connnects * a `ClientPlain` to generate auth key if there was none, and connects the client. * Before establishing the connection, the session is saved. */ async connect() { - await this.#initStorage(); - const [authKey, dc] = await Promise.all([this.storage.getAuthKey(), this.storage.getDc()]); - if (authKey != null && dc != null) { - await this.#client.setAuthKey(authKey); - this.#client.setDc(dc); - if (this.#client.serverSalt == 0n) { - this.#client.serverSalt = await this.storage.getServerSalt() ?? 0n; - } - } else { - const plain = new ClientPlain({ initialDc: this.#client.initialDc, transportProvider: this.#client.transportProvider, cdn: this.#client.cdn, publicKeys: this.#publicKeys }); - const dc = await this.storage.getDc(); - if (dc != null) { - plain.setDc(dc); + const unlock = await this.#connectMutex.lock(); + if (this.connected) { + return; + } + if (this.#lastConnect != null && Date.now() - this.#lastConnect.getTime() <= 10 * second) { + await new Promise((r) => setTimeout(r, 3 * second)); + } + try { + await this.#initStorage(); + const [authKey, dc] = await Promise.all([this.storage.getAuthKey(), this.storage.getDc()]); + if (authKey != null && dc != null) { + await this.#client.setAuthKey(authKey); this.#client.setDc(dc); + if (this.#client.serverSalt == 0n) { + this.#client.serverSalt = await this.storage.getServerSalt() ?? 0n; + } + } else { + const plain = new ClientPlain({ initialDc: this.#client.initialDc, transportProvider: this.#client.transportProvider, cdn: this.#client.cdn, publicKeys: this.#publicKeys }); + const dc = await this.storage.getDc(); + if (dc != null) { + plain.setDc(dc); + this.#client.setDc(dc); + } + await plain.connect(); + const [authKey, serverSalt] = await plain.createAuthKey(); + drop(plain.disconnect()); + await this.#client.setAuthKey(authKey); + this.#client.serverSalt = serverSalt; } - await plain.connect(); - const [authKey, serverSalt] = await plain.createAuthKey(); - drop(plain.disconnect()); - await this.#client.setAuthKey(authKey); - this.#client.serverSalt = serverSalt; + await this.#client.connect(); + this.#lastConnect = new Date(); + await Promise.all([this.storage.setAuthKey(this.#client.authKey), this.storage.setDc(this.#client.dc), this.storage.setServerSalt(this.#client.serverSalt)]); + } finally { + unlock(); } - await this.#client.connect(); - await Promise.all([this.storage.setAuthKey(this.#client.authKey), this.storage.setDc(this.#client.dc), this.storage.setServerSalt(this.#client.serverSalt)]); } async reconnect(dc?: DC) { - await this.disconnect(); if (dc) { await this.setDc(dc); } - await this.connect(); + await this.#client.reconnect(); } async [handleMigrationError](err: Migrate) { diff --git a/connection/1_connection_tcp.ts b/connection/1_connection_tcp.ts index 35399a94..21ff2f62 100644 --- a/connection/1_connection_tcp.ts +++ b/connection/1_connection_tcp.ts @@ -61,6 +61,8 @@ export class ConnectionTCP implements Connection { hostname: this.#hostname, port: this.#port, }); + connection.setNoDelay(true); + connection.setKeepAlive(true); this.#canRead = this.#canWrite = true; this.stateChangeHandler?.(true); Promise.resolve().then(async () => { @@ -126,7 +128,7 @@ export class ConnectionTCP implements Connection { this.callback?.write(wrote); written += wrote; } catch (err) { - if (err instanceof Deno.errors.BrokenPipe) { + if (err instanceof Deno.errors.BrokenPipe || err instanceof Deno.errors.ConnectionReset) { this.#canWrite = false; } if (!this.connected) { diff --git a/transport/1_transport_abridged.ts b/transport/1_transport_abridged.ts index 27bcc046..0a00500c 100644 --- a/transport/1_transport_abridged.ts +++ b/transport/1_transport_abridged.ts @@ -19,6 +19,7 @@ */ import { concat } from "../0_deps.ts"; +import { ConnectionError } from "../0_errors.ts"; import { bufferFromBigInt } from "../1_utilities.ts"; import { Connection } from "../2_connection.ts"; import { getObfuscationParameters } from "./0_obfuscation.ts"; @@ -43,8 +44,6 @@ export class TransportAbridged extends Transport implements Transport { await this.#connection.write(new Uint8Array([0xEF])); } this.#initialized = true; - } else { - throw new Error("Transport already initialized"); } } @@ -78,7 +77,7 @@ export class TransportAbridged extends Transport implements Transport { async send(buffer: Uint8Array) { if (!this.initialized) { - throw new Error("Transport not initialized"); + throw new ConnectionError("Transport not initialized"); } const bufferLength = buffer.length / 4;