Skip to content

Commit

Permalink
Connection stability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
roj1512 committed Jun 5, 2024
1 parent 11b08a1 commit 96fef83
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 76 deletions.
10 changes: 7 additions & 3 deletions client/0_client_abstract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,29 @@ export abstract class ClientAbstract {
await initTgCrypto();
await this.transport.connection.open();
await this.transport.transport.initialize();
this.#disconnected = false;
}

async reconnect(dc?: DC) {
await this.disconnect();
await this.transport?.transport.deinitialize();
await this.transport?.connection.close();
if (dc) {
await this.setDc(dc);
this.setDc(dc);
}
await this.connect();
}

#disconnected = false;
async disconnect() {
if (!this.transport) {
throw new ConnectionError("Not connected.");
}
await this.transport.transport.deinitialize();
await this.transport.connection.close();
this.#disconnected = true;
}

get disconnected(): boolean {
return !this.transport?.transport.initialized;
return this.#disconnected;
}
}
2 changes: 2 additions & 0 deletions client/2_update_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,8 @@ export class UpdateManager {
unreachable();
}
}
} catch (err) {
this.#LrecoverUpdateGap.error(err);
} finally {
this.#c.resetConnectionState();
}
Expand Down
159 changes: 90 additions & 69 deletions client/5_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
*/

import { unreachable } from "../0_deps.ts";
import { AccessError, InputError } from "../0_errors.ts";
import { cleanObject, drop, getLogger, getRandomId, Logger, MaybePromise, minute, mustPrompt, mustPromptOneOf, second, ZERO_CHANNEL_ID } from "../1_utilities.ts";
import { AccessError, ConnectionError, InputError } from "../0_errors.ts";
import { cleanObject, drop, getLogger, getRandomId, Logger, MaybePromise, minute, mustPrompt, mustPromptOneOf, Mutex, second, ZERO_CHANNEL_ID } from "../1_utilities.ts";
import { Api, as, chatIdToPeerId, getChatIdPeerType, is, peerToChatId } from "../2_tl.ts";
import { Storage, StorageMemory } from "../2_storage.ts";
import { DC } from "../3_transport.ts";
Expand Down Expand Up @@ -324,12 +324,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
await this.#updateManager.recoverUpdateGap(source);
break;
case "decryption":
try {
await this.disconnect();
} catch {
//
}
await this.connect();
await this.reconnect();
await this.#updateManager.recoverUpdateGap(source);
break;
}
Expand Down Expand Up @@ -418,50 +413,63 @@ export class Client<C extends Context = Context> extends Composer<C> {
return transport;
};

if (params?.defaultHandlers ?? true) {
let reconnecting = false;
let lastReconnection: Date | null = null;
this.on("connectionState", ({ connectionState }, next) => {
if (connectionState != "notConnected") {
return;
}
if (this.disconnected) {
L.debug("not reconnecting");
return;
}
if (reconnecting) {
return;
}
reconnecting = true;
drop((async () => {
this.invoke.use(async ({ error }, next) => {
if (error instanceof ConnectionError) {
while (!this.connected) {
if (this.disconnected) {
return next();
}
try {
let delay = 5;
if (lastReconnection != null && Date.now() - lastReconnection.getTime() <= 10 * second) {
await new Promise((r) => setTimeout(r, delay * second));
}
while (!this.connected) {
L.debug("reconnecting");
try {
await this.connect();
lastReconnection = new Date();
L.debug("reconnected");
drop(this.#updateManager.recoverUpdateGap("reconnect"));
break;
} catch (err) {
if (delay < 15) {
delay += 5;
}
L.debug(`failed to reconnect, retrying in ${delay}:`, err);
await this.connect();
} catch {
//
}
}
return true;
} else {
return next();
}
});

let reconnecting = false;
this.on("connectionState", ({ connectionState }, next) => {
if (connectionState != "notConnected") {
return next();
}
if (this.disconnected) {
L.debug("not reconnecting");
return next();
}
if (reconnecting) {
return next();
}
reconnecting = true;
drop((async () => {
try {
let delay = 5;
while (!this.connected) {
L.debug("reconnecting");
try {
await this.connect();
L.debug("reconnected");
drop(this.#updateManager.recoverUpdateGap("reconnect"));
break;
} catch (err) {
if (delay < 15) {
delay += 5;
}
await new Promise((r) => setTimeout(r, delay * second));
L.debug(`failed to reconnect, retrying in ${delay}:`, err);
}
} finally {
reconnecting = false;
await new Promise((r) => setTimeout(r, delay * second));
}
})());
return next();
});
} finally {
reconnecting = false;
}
})());
return next();
});

if (params?.defaultHandlers ?? true) {
this.invoke.use(async ({ error }, next) => {
if (error instanceof FloodWait && error.seconds <= 10) {
L.warning("sleeping for", error.seconds, "because of:", error);
Expand Down Expand Up @@ -948,43 +956,56 @@ export class Client<C extends Context = Context> extends Composer<C> {
}
}

#connectMutex = new Mutex();
#lastConnect: Date | null = null;
/**
* Loads the session if `setDc` was not called, initializes and connnects
* a `ClientPlain` to generate auth key if there was none, and connects the client.
* Before establishing the connection, the session is saved.
*/
async connect() {
await this.#initStorage();
const [authKey, dc] = await Promise.all([this.storage.getAuthKey(), this.storage.getDc()]);
if (authKey != null && dc != null) {
await this.#client.setAuthKey(authKey);
this.#client.setDc(dc);
if (this.#client.serverSalt == 0n) {
this.#client.serverSalt = await this.storage.getServerSalt() ?? 0n;
}
} else {
const plain = new ClientPlain({ initialDc: this.#client.initialDc, transportProvider: this.#client.transportProvider, cdn: this.#client.cdn, publicKeys: this.#publicKeys });
const dc = await this.storage.getDc();
if (dc != null) {
plain.setDc(dc);
const unlock = await this.#connectMutex.lock();
if (this.connected) {
return;
}
if (this.#lastConnect != null && Date.now() - this.#lastConnect.getTime() <= 10 * second) {
await new Promise((r) => setTimeout(r, 3 * second));
}
try {
await this.#initStorage();
const [authKey, dc] = await Promise.all([this.storage.getAuthKey(), this.storage.getDc()]);
if (authKey != null && dc != null) {
await this.#client.setAuthKey(authKey);
this.#client.setDc(dc);
if (this.#client.serverSalt == 0n) {
this.#client.serverSalt = await this.storage.getServerSalt() ?? 0n;
}
} else {
const plain = new ClientPlain({ initialDc: this.#client.initialDc, transportProvider: this.#client.transportProvider, cdn: this.#client.cdn, publicKeys: this.#publicKeys });
const dc = await this.storage.getDc();
if (dc != null) {
plain.setDc(dc);
this.#client.setDc(dc);
}
await plain.connect();
const [authKey, serverSalt] = await plain.createAuthKey();
drop(plain.disconnect());
await this.#client.setAuthKey(authKey);
this.#client.serverSalt = serverSalt;
}
await plain.connect();
const [authKey, serverSalt] = await plain.createAuthKey();
drop(plain.disconnect());
await this.#client.setAuthKey(authKey);
this.#client.serverSalt = serverSalt;
await this.#client.connect();
this.#lastConnect = new Date();
await Promise.all([this.storage.setAuthKey(this.#client.authKey), this.storage.setDc(this.#client.dc), this.storage.setServerSalt(this.#client.serverSalt)]);
} finally {
unlock();
}
await this.#client.connect();
await Promise.all([this.storage.setAuthKey(this.#client.authKey), this.storage.setDc(this.#client.dc), this.storage.setServerSalt(this.#client.serverSalt)]);
}

async reconnect(dc?: DC) {
await this.disconnect();
if (dc) {
await this.setDc(dc);
}
await this.connect();
await this.#client.reconnect();
}

async [handleMigrationError](err: Migrate) {
Expand Down
4 changes: 3 additions & 1 deletion connection/1_connection_tcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export class ConnectionTCP implements Connection {
hostname: this.#hostname,
port: this.#port,
});
connection.setNoDelay(true);
connection.setKeepAlive(true);
this.#canRead = this.#canWrite = true;
this.stateChangeHandler?.(true);
Promise.resolve().then(async () => {
Expand Down Expand Up @@ -126,7 +128,7 @@ export class ConnectionTCP implements Connection {
this.callback?.write(wrote);
written += wrote;
} catch (err) {
if (err instanceof Deno.errors.BrokenPipe) {
if (err instanceof Deno.errors.BrokenPipe || err instanceof Deno.errors.ConnectionReset) {
this.#canWrite = false;
}
if (!this.connected) {
Expand Down
5 changes: 2 additions & 3 deletions transport/1_transport_abridged.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

import { concat } from "../0_deps.ts";
import { ConnectionError } from "../0_errors.ts";
import { bufferFromBigInt } from "../1_utilities.ts";
import { Connection } from "../2_connection.ts";
import { getObfuscationParameters } from "./0_obfuscation.ts";
Expand All @@ -43,8 +44,6 @@ export class TransportAbridged extends Transport implements Transport {
await this.#connection.write(new Uint8Array([0xEF]));
}
this.#initialized = true;
} else {
throw new Error("Transport already initialized");
}
}

Expand Down Expand Up @@ -78,7 +77,7 @@ export class TransportAbridged extends Transport implements Transport {

async send(buffer: Uint8Array) {
if (!this.initialized) {
throw new Error("Transport not initialized");
throw new ConnectionError("Transport not initialized");
}

const bufferLength = buffer.length / 4;
Expand Down

0 comments on commit 96fef83

Please sign in to comment.