Skip to content

Commit

Permalink
Fix issues with uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
roj1512 committed May 25, 2024
1 parent 7772089 commit 02f86fa
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 39 deletions.
8 changes: 7 additions & 1 deletion client/1_client_encrypted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ export class ClientEncrypted extends ClientAbstract {
if (!this.transport) {
unreachable();
}

for (const [key, { reject }] of this.#promises.entries()) {
reject?.(new ConnectionError("Connection was closed"));
this.#promises.delete(key);
}

while (this.connected) {
try {
const buffer = await this.transport.transport.receive();
Expand Down Expand Up @@ -216,7 +222,7 @@ export class ClientEncrypted extends ClientAbstract {
if (is("rpc_error", result)) {
this.#LreceiveLoop.debug("RPCResult:", result.error_code, result.error_message);
} else {
this.#LreceiveLoop.debug("RPCResult:", result._);
this.#LreceiveLoop.debug("RPCResult:", typeof result === "object" ? result._ : result);
}
const messageId = message.body.req_msg_id;
const promise = this.#promises.get(messageId);
Expand Down
93 changes: 70 additions & 23 deletions client/2_file_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import { extension, path, unreachable } from "../0_deps.ts";
import { AssertionError, extension, path, unreachable } from "../0_deps.ts";
import { InputError } from "../0_errors.ts";
import { drop, getLogger, getRandomId, iterateReadableStream, kilobyte, Logger, megabyte, minute, mod, Part, PartStream } from "../1_utilities.ts";
import { drop, getLogger, getRandomId, iterateReadableStream, kilobyte, Logger, megabyte, minute, mod, Part, PartStream, second } from "../1_utilities.ts";
import { Api, as, is } from "../2_tl.ts";
import { constructSticker, deserializeFileId, FileId, FileSource, FileType, PhotoSourceType, serializeFileId, Sticker, toUniqueFileId } from "../3_types.ts";
import { STICKER_SET_NAME_TTL } from "../4_constants.ts";
import { FloodWait } from "../4_errors.ts";
import { DownloadParams, UploadParams } from "./0_params.ts";
import { C, ConnectionPool } from "./1_types.ts";

Expand Down Expand Up @@ -94,8 +93,12 @@ export class FileManager {
for await (part of iterateReadableStream(stream.pipeThrough(new PartStream(chunkSize)))) {
promises.push(
Promise.resolve().then(async () => {
let retryIn = 1;
let errorCount = 0;
while (true) {
try {
signal?.throwIfAborted();
this.#Lupload.debug(`[${fileId}] uploading part ` + (part.part + 1));
if (part.small) {
await invoke({ _: "upload.saveFilePart", file_id: fileId, bytes: part.bytes, file_part: part.part });
} else {
Expand All @@ -106,7 +109,15 @@ export class FileManager {
} catch (err) {
signal?.throwIfAborted();
this.#Lupload.debug(`[${fileId}] failed to upload part ` + (part.part + 1));
await this.#handleUploadError(err);
++errorCount;
if (errorCount > 20) {
retryIn = 0;
}
await this.#handleError(err, retryIn, `[${fileId}-${part.part + 1}]`);
retryIn += 2;
if (retryIn > 11) {
retryIn = 11;
}
}
}
}),
Expand All @@ -128,6 +139,8 @@ export class FileManager {
const isBig = buffer.byteLength > FileManager.#BIG_FILE_THRESHOLD;
const partCount = Math.ceil(buffer.byteLength / chunkSize);
let promises = new Array<Promise<void>>();
let started = false;
let delay = 0.05;
main: for (let part = 0; part < partCount;) {
for (let i = 0; i < pool.size; ++i) {
const invoke = pool.invoke();
Expand All @@ -139,11 +152,20 @@ export class FileManager {
break main;
}
const thisPart = part++; // `thisPart` must be used instead of `part` in the promise body
if (!started) {
started = true;
} else if (isBig) {
await new Promise((r) => setTimeout(r, delay));
delay = Math.max(delay * .8, 0.003);
}
promises.push(
Promise.resolve().then(async () => {
let retryIn = 1;
let errorCount = 0;
while (true) {
try {
signal?.throwIfAborted();
this.#Lupload.debug(`[${fileId}] uploading part ` + (thisPart + 1));
if (isBig) {
await invoke({ _: "upload.saveBigFilePart", file_id: fileId, file_part: thisPart, bytes, file_total_parts: partCount });
} else {
Expand All @@ -153,8 +175,16 @@ export class FileManager {
break;
} catch (err) {
signal?.throwIfAborted();
this.#Lupload.debug(`[${fileId}] failed to upload part ` + (thisPart + 1) + " / " + partCount);
await this.#handleUploadError(err);
this.#Lupload.debug(`[${fileId}] failed to upload part ` + (thisPart + 1) + " / " + partCount, err);
++errorCount;
if (errorCount > 20) {
retryIn = 0;
}
await this.#handleError(err, retryIn, `[${fileId}-${thisPart + 1}]`);
retryIn += 2;
if (retryIn > 11) {
retryIn = 11;
}
}
}
}),
Expand All @@ -168,10 +198,10 @@ export class FileManager {
return { small: !isBig, parts: partCount };
}

async #handleUploadError(err: unknown) {
if (err instanceof FloodWait) {
this.#Lupload.warning("got a flood wait of " + err.seconds + " seconds");
await new Promise((r) => setTimeout(r, err.seconds * 1000));
async #handleError(err: unknown, retryIn: number, logPrefix: string) {
if (retryIn > 0) {
this.#Lupload.warning(`${logPrefix} retrying in ${retryIn} seconds`);
await new Promise((r) => setTimeout(r, retryIn * second));
} else {
throw err;
}
Expand Down Expand Up @@ -280,24 +310,41 @@ export class FileManager {

try {
while (true) {
const file = await connection.invoke({ _: "upload.getFile", location, offset, limit });
let retryIn = 1;
let errorCount = 0;
try {
const file = await connection.invoke({ _: "upload.getFile", location, offset, limit });

if (is("upload.file", file)) {
yield file.bytes;
if (id != null) {
await this.#c.storage.saveFilePart(id, part, file.bytes);
}
++part;
if (file.bytes.length < limit) {
if (is("upload.file", file)) {
yield file.bytes;
if (id != null) {
await this.#c.storage.setFilePartCount(id, part + 1, chunkSize);
await this.#c.storage.saveFilePart(id, part, file.bytes);
}
++part;
if (file.bytes.length < limit) {
if (id != null) {
await this.#c.storage.setFilePartCount(id, part + 1, chunkSize);
}
break;
} else {
offset += BigInt(file.bytes.length);
}
break;
} else {
offset += BigInt(file.bytes.length);
unreachable();
}
} catch (err) {
if (typeof err === "object" && err instanceof AssertionError) {
throw err;
}
++errorCount;
if (errorCount > 20) {
retryIn = 0;
}
await this.#handleError(err, retryIn, `[${id}-${part + 1}]`);
retryIn += 2;
if (retryIn > 11) {
retryIn = 11;
}
} else {
unreachable();
}
}
} finally {
Expand Down
12 changes: 12 additions & 0 deletions client/2_update_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ export class UpdateManager {
}

async fetchState(source: string) {
if (this.#c.cdn) {
return;
}
let state = await this.#c.invoke({ _: "updates.getState" });
const difference = await this.#c.invoke({ ...state, _: "updates.getDifference" });
if (is("updates.difference", difference)) {
Expand Down Expand Up @@ -436,6 +439,9 @@ export class UpdateManager {

#processUpdatesQueue = new Queue("UpdateManager/processUpdates");
processUpdates(updates: Api.Update | Api.Updates, checkGap: boolean, call: Api.AnyObject | null = null, callback?: () => void) {
if (this.#c.cdn) {
return;
}
this.#processUpdatesQueue.add(() => this.#processUpdates(updates, checkGap, call).then(callback));
}

Expand Down Expand Up @@ -629,6 +635,9 @@ export class UpdateManager {
return localState;
}
async recoverUpdateGap(source: string) {
if (this.#c.cdn) {
return;
}
this.#LrecoverUpdateGap.debug(`recovering from update gap [${source}]`);

this.#c.setConnectionState("updating");
Expand Down Expand Up @@ -793,6 +802,9 @@ export class UpdateManager {
}

setUpdateHandler(handler: UpdateHandler) {
if (this.#c.cdn) {
return;
}
this.#updateHandler = handler;
}
}
33 changes: 24 additions & 9 deletions client/5_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
readonly #ignoreOutgoing: boolean | null;
#persistCache: boolean;

#cdn: boolean;
#LsignIn: Logger;
#LpingLoop: Logger;
#LhandleMigrationError: Logger;
Expand Down Expand Up @@ -366,6 +367,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
this.#LpingLoop = L.branch("pingLoop");
this.#LhandleMigrationError = L.branch("[handleMigrationError]");
this.#L$initConncetion = L.branch("#initConnection");
this.#cdn = params?.cdn ?? false;

const c = {
id,
Expand All @@ -390,7 +392,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
parseMode: this.#parseMode,
getCdnConnection: this.#getCdnConnection.bind(this),
getCdnConnectionPool: this.#getCdnConnectionPool.bind(this),
cdn: params?.cdn ?? false,
cdn: this.#cdn,
ignoreOutgoing: this.#ignoreOutgoing,
dropPendingUpdates: params?.dropPendingUpdates,
};
Expand All @@ -417,14 +419,22 @@ export class Client<C extends Context = Context> extends Composer<C> {
};

if (params?.defaultHandlers ?? true) {
let reconnecting = false;
let lastReconnection: Date | null = null;
this.on("connectionState", ({ connectionState }, next) => {
if (connectionState != "notConnected") {
return;
}
if (this.disconnected) {
L.debug("not reconnecting");
return;
}
if (reconnecting) {
return;
}
reconnecting = true;
drop((async () => {
if (connectionState == "notConnected") {
if (this.disconnected) {
L.debug("not reconnecting");
return;
}
try {
let delay = 5;
if (lastReconnection != null && Date.now() - lastReconnection.getTime() <= 10 * second) {
await new Promise((r) => setTimeout(r, delay * second));
Expand All @@ -445,6 +455,8 @@ export class Client<C extends Context = Context> extends Composer<C> {
}
await new Promise((r) => setTimeout(r, delay * second));
}
} finally {
reconnecting = false;
}
})());
return next();
Expand Down Expand Up @@ -537,7 +549,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
});

return {
invoke: client.invoke,
invoke: client.invoke.bind(client),
connect: async () => {
await client.connect();

Expand Down Expand Up @@ -904,7 +916,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
#lastPropagatedConnectionState: ConnectionState | null = null;
#stateChangeHandler: (connected: boolean) => void = ((connected: boolean) => {
const connectionState = connected ? "ready" : "notConnected";
if (this.connected == connected && this.#lastPropagatedConnectionState != connectionState) {
if (this.#lastPropagatedConnectionState != connectionState) {
this.#propagateConnectionState(connectionState);
}
}).bind(this);
Expand Down Expand Up @@ -1196,6 +1208,9 @@ export class Client<C extends Context = Context> extends Composer<C> {
drop(this.#pingLoop());
}
async #pingLoop() {
if (this.#cdn) {
return;
}
this.#pingLoopAbortController = new AbortController();
while (this.connected) {
try {
Expand Down Expand Up @@ -1230,6 +1245,7 @@ export class Client<C extends Context = Context> extends Composer<C> {
while (true) {
try {
if (!this.#connectionInited && !isMtprotoFunction(function_)) {
this.#connectionInited = true;
const result = await this.#client.invoke({
_: "initConnection",
api_id: await this.#getApiId(),
Expand All @@ -1245,7 +1261,6 @@ export class Client<C extends Context = Context> extends Composer<C> {
system_lang_code: this.systemLangCode,
system_version: this.systemVersion,
}, noWait);
this.#connectionInited = true;
this.#L$initConncetion.debug("connection inited");
return result as R | void;
} else {
Expand Down
10 changes: 4 additions & 6 deletions connection/1_connection_tcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,10 @@ export class ConnectionTCP implements Connection {
this.#canRead = false;
break;
} catch (err) {
if (!this.connected) {
this.stateChangeHandler?.(false);
this.#rejectRead();
} else {
L.error(err);
}
this.#canRead = false;
this.stateChangeHandler?.(false);
this.#rejectRead();
L.error(err);
}
} while (this.connected);
this.stateChangeHandler?.(false);
Expand Down

0 comments on commit 02f86fa

Please sign in to comment.