Skip to content

Commit

Permalink
Rewrite to use new attachment db & global async yeild
Browse files Browse the repository at this point in the history
  • Loading branch information
Inrixia committed Apr 26, 2023
1 parent ccaace6 commit 15210f9
Show file tree
Hide file tree
Showing 13 changed files with 1,904 additions and 664 deletions.
1,458 changes: 1,398 additions & 60 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -31,6 +31,7 @@
"sanitize-filename": "^1.6.3",
"semver": "^7.5.0",
"sequelize": "^6.31.0",
"sqlite3": "^5.1.6",
"tough-cookie": "^4.1.2",
"tough-cookie-file-store": "^2.0.3"
},
Expand Down
351 changes: 149 additions & 202 deletions src/Downloader.ts
@@ -1,241 +1,188 @@
import { MultiProgressBars, UpdateOptions } from "multi-progress-bars";
import Video from "./lib/Video.js";
import { VideoState, Video } from "./lib/Video.js";

import { settings, args } from "./lib/helpers.js";
import { MyPlexAccount } from "@ctrl/plex";

import { promisify } from "util";
const sleep = promisify(setTimeout);

type promiseFunction = (f: Promise<void>) => void;

const reset = "\u001b[0m";
const cy = (str: string | number) => `\u001b[36;1m${str}\u001b[0m`;
const gr = (str: string | number) => `\u001b[32;1m${str}\u001b[0m`;
const ye = (str: string | number) => `\u001b[33;1m${str}\u001b[0m`;
const bl = (str: string | number) => `\u001b[34;1m${str}\u001b[0m`;

type DownloadProgress = { total: number; transferred: number; percent: number };
type Task = { video: Video; res: promiseFunction; formattedTitle: string };

const MaxRetries = 5;
const DownloadThreads = 8;

// Ew, I really need to refactor this monster of a class

export default class Downloader {
private mpb?: MultiProgressBars;
private taskQueue: Task[] = [];
private videosProcessing = 0;
private videosProcessed = 0;
private summaryStats: { [key: string]: { totalMB: number; downloadedMB: number; downloadSpeed: number } } = {};

private runQueue = false;

start(): void {
if (this.mpb === undefined && args.headless !== true) this.mpb = new MultiProgressBars({ initMessage: "", anchor: "top" });
if (this.runQueue === false) {
this.runQueue = true;
this.tickQueue();
}
}

private tickQueue(): void {
if (this.runQueue === false) return;
while (this.taskQueue.length !== 0 && this.videosProcessing < DownloadThreads) {
this.videosProcessing++;
const task = this.taskQueue.pop();
if (task !== undefined) {
const processingVideoPromise = this.processVideo(task);
task.res(processingVideoPromise);
processingVideoPromise.then(() => this.videosProcessing-- && this.videosProcessed++ && this.updateSummaryBar());
let mpb: MultiProgressBars;
if (args.headless !== true) mpb = new MultiProgressBars({ initMessage: "", anchor: "bottom" });

let totalVideos = 0;
let completedVideos = 0;
const summaryStats: { [key: string]: { totalMB: number; downloadedMB: number; downloadSpeed: number } } = {
_: { totalMB: 0, downloadedMB: 0, downloadSpeed: 0 },
};

// The number of available slots for making delivery requests,
// limiting the rate of requests to avoid exceeding the API rate limit.
let AvalibleDeliverySlots = DownloadThreads;
const DownloadQueue: (() => void)[] = [];

const getDownloadSempahore = async () => {
totalVideos++;
// If there is an available request slot, proceed immediately
if (AvalibleDeliverySlots > 0) return AvalibleDeliverySlots--;

// Otherwise, wait for a request slot to become available
return new Promise((r) => DownloadQueue.push(() => r(AvalibleDeliverySlots--)));
};

const releaseDownloadSemaphore = () => {
AvalibleDeliverySlots++;
completedVideos++;

// If there are queued requests, resolve the first one in the queue
DownloadQueue.shift()?.();
};

const updateSummaryBar = () => {
if (summaryStats === undefined || args.headless === true) return;
const { totalMB, downloadedMB, downloadSpeed } = Object.values(summaryStats).reduce(
(summary, stats) => {
for (const key in stats) {
summary[key as keyof typeof stats] += stats[key as keyof typeof stats];
}
}
setTimeout(() => this.tickQueue(), 50);
}

stop(): void {
this.runQueue = false;
}

processVideos(videos: Video[]): Array<Promise<void>> {
if (videos.length === 0) return [];

console.log(`> Processing ${videos.length} videos...`);
this.summaryStats = {};
this.videosProcessed = 0;

const processingPromises = videos
.reverse()
.map((video) => new Promise<void>((res) => this.taskQueue.push({ video, res, formattedTitle: this.formatTitle(video) })));

// Handler for when all downloads are done.
Promise.all(processingPromises).then(this.updateSummaryBar.bind(this));
return processingPromises;
}

private updateSummaryBar(): void {
if (this.summaryStats === undefined || args.headless === true) return;
const { totalMB, downloadedMB, downloadSpeed } = Object.values(this.summaryStats).reduce(
(summary, stats) => {
for (const key in stats) {
summary[key as keyof typeof stats] += stats[key as keyof typeof stats];
}
return summary;
},
{ totalMB: 0, downloadedMB: 0, downloadSpeed: 0 }
);
// (videos remaining * avg time to download a video)
const totalVideos = this.taskQueue.length + this.videosProcessed + this.videosProcessing;
const processed = `Processed: ${ye(this.videosProcessed)}/${ye(totalVideos)}`;
const downloaded = `Total Downloaded: ${cy(downloadedMB.toFixed(2))}/${cy(totalMB.toFixed(2) + "MB")}`;
const speed = `Download Speed: ${gr(((downloadSpeed / 1024000) * 8).toFixed(2) + "mb/s")}`;
this.mpb?.setFooter({
message: `${processed} ${downloaded} ${speed}`,
pattern: "",
return summary;
},
{ totalMB: 0, downloadedMB: 0, downloadSpeed: 0 }
);
// (videos remaining * avg time to download a video)
const processed = `Processed: ${ye(completedVideos)}/${ye(totalVideos)}`;
const downloaded = `Total Downloaded: ${cy(downloadedMB.toFixed(2))}/${cy(totalMB.toFixed(2) + "MB")}`;
const speed = `Download Speed: ${gr(((downloadSpeed / 1024000) * 8).toFixed(2) + "mb/s")}`;
mpb?.setFooter({
message: `${processed} ${downloaded} ${speed}`,
pattern: "",
});
};

const log = (formattedTitle: string, barUpdate: UpdateOptions, displayNow = true) => {
if (args.headless === true && displayNow === true && barUpdate.message !== undefined) console.log(`${formattedTitle} - ${barUpdate.message}`);
if (mpb?.getIndex(formattedTitle)) mpb?.updateTask(formattedTitle, barUpdate);
};

const formatTitle = (title: string) => {
let formattedTitle = args.headless === true ? title : title.slice(0, 32);

if (summaryStats !== undefined) while (formattedTitle in summaryStats) formattedTitle = `.${formattedTitle}`.slice(0, 32);

return formattedTitle;
};

export const queueVideo = async (video: Video) => {
await getDownloadSempahore();

processVideo(video).then(releaseDownloadSemaphore);
};

const processVideo = async (video: Video, retries = 0) => {
const fTitle = formatTitle(video.title);
try {
mpb?.addTask(fTitle, {
type: "percentage",
message: "Checking download status...",
});
}

/**
* Log the progress bar for a specific video.
* @param formattedTitle Title of bar to update.
* @param barUpdate Update object to update the bar with.
* @param displayNow If the update should be immediately sent to console (Only applied if running in headless mode)
*/
private log(formattedTitle: string, barUpdate: UpdateOptions, displayNow = false): void {
if (args.headless === true && displayNow === true && barUpdate.message !== undefined) console.log(`${formattedTitle} - ${barUpdate.message}`);
this.mpb?.updateTask(formattedTitle, barUpdate);
}

private formatTitle(video: Video) {
let formattedTitle: string;
if (args.headless === true) formattedTitle = `${video.channel.title} - ${video.title}`;
else formattedTitle = `${video.channel.title} - ${video.title}`.slice(0, 32);

if (this.summaryStats !== undefined) while (formattedTitle in this.summaryStats) formattedTitle = `.${formattedTitle}`.slice(0, 32);
if (settings.extras.saveNfo) await video.saveNfo();
if (settings.extras.downloadArtwork) await video.downloadArtwork();

return formattedTitle;
}

private async processVideo(task: Task, retries = 0): Promise<void> {
const { video, formattedTitle } = task;

this.mpb?.addTask(formattedTitle, {
type: "percentage",
message: "Initalizing...",
});
switch (await video.getState()) {
case VideoState.Missing: {
mpb?.addTask(fTitle, {
type: "percentage",
message: "Waiting on delivery cdn...",
});

try {
if (settings.extras.saveNfo) await video.saveNfo();
if (settings.extras.downloadArtwork) await video.downloadArtwork();
// If the video is already downloaded then just mux its metadata
if (!(await video.isDownloaded())) {
const startTime = Date.now();

const totalBytes: number[] = [];
const downloadedBytes: number[] = [];
const percentage: number[] = [];
if (args.headless === true) console.log(`${fTitle} - Waiting on delivery cdn...`);

const downloadPromises: Promise<void>[] = [];
const downloadRequest = await video.download(settings.floatplane.videoResolution);

const getStats = () => {
downloadRequest.on("downloadProgress", (downloadProgress: DownloadProgress) => {
const timeElapsed = (Date.now() - startTime) / 1000;

// Sum the stats for multi part video downloads
const total = totalBytes.reduce((sum, b) => sum + b, 0);
const transferred = downloadedBytes.reduce((sum, b) => sum + b, 0);

const totalMB = total / 1024000;
const downloadedMB = transferred / 1024000;
const downloadSpeed = transferred / timeElapsed;
const downloadETA = total / downloadSpeed - timeElapsed; // Round to 4 decimals
const totalMB = downloadProgress.total / 1024000;
const downloadedMB = downloadProgress.transferred / 1024000;
const downloadSpeed = downloadProgress.transferred / timeElapsed;
const downloadETA = downloadProgress.total / downloadSpeed - timeElapsed; // Round to 4 decimals

log(
fTitle,
{
percentage: downloadProgress.percent,
message: `${reset}${cy(downloadedMB.toFixed(2))}/${cy(totalMB.toFixed(2) + "MB")} ${gr(((downloadSpeed / 1024000) * 8).toFixed(2) + "mb/s")} ETA: ${bl(
Math.floor(downloadETA / 60) + "m " + (Math.floor(downloadETA) % 60) + "s"
)}`,
},
false
);
summaryStats[fTitle] = { totalMB, downloadedMB, downloadSpeed };
updateSummaryBar();
});

return { totalMB, downloadedMB, downloadSpeed, downloadETA };
};
await new Promise((res, rej) => {
downloadRequest.on("end", res);
downloadRequest.on("error", rej);
});

this.mpb?.addTask(formattedTitle, {
type: "percentage",
message: "Waiting on delivery cdn...",
summaryStats._.downloadedMB = summaryStats[fTitle].downloadedMB;
summaryStats._.totalMB = summaryStats[fTitle].totalMB;
delete summaryStats[fTitle];
}
// eslint-disable-next-line no-fallthrough
case VideoState.Partial: {
log(fTitle, {
percentage: 0.99,
message: "Muxing ffmpeg metadata...",
});
await video.muxffmpegMetadata();

if (args.headless === true) console.log(`${formattedTitle} - Waiting on delivery cdn...`);

let i = 0;
for await (const downloadRequest of video.download(settings.floatplane.videoResolution)) {
downloadRequest.on("end", () => {
const { totalMB, downloadedMB } = getStats();
this.log(
formattedTitle,
{
percentage: percentage.reduce((sum, b) => sum + b, 0) / percentage.length,
message: `${reset}${cy(downloadedMB.toFixed(2))}/${cy(totalMB.toFixed(2) + "MB")} - Waiting on delivery cdn...`,
},
true
);
});

((index) =>
downloadRequest.on("downloadProgress", (downloadProgress: DownloadProgress) => {
totalBytes[index] = downloadProgress.total;
downloadedBytes[index] = downloadProgress.transferred;
percentage[index] = downloadProgress.percent;

const { totalMB, downloadedMB, downloadSpeed, downloadETA } = getStats();

this.log(formattedTitle, {
percentage: percentage.reduce((sum, b) => sum + b, 0) / percentage.length,
message: `${reset}${cy(downloadedMB.toFixed(2))}/${cy(totalMB.toFixed(2) + "MB")} ${gr(((downloadSpeed / 1024000) * 8).toFixed(2) + "mb/s")} ETA: ${bl(
Math.floor(downloadETA / 60) + "m " + (Math.floor(downloadETA) % 60) + "s"
)}`,
});
this.summaryStats[formattedTitle] = { totalMB, downloadedMB, downloadSpeed };
this.updateSummaryBar();
}))(i++);

downloadPromises.push(
new Promise((res, rej) => {
downloadRequest.on("end", res);
downloadRequest.on("error", rej);
})
);
if (settings.postProcessingCommand !== "") {
log(fTitle, { message: `Running post download command "${settings.postProcessingCommand}"...` }, true);
await video.postProcessingCommand().catch((err) => console.log(`An error occurred while executing the postProcessingCommand!\n${err.message}\n`));
}

await Promise.all(downloadPromises);
this.summaryStats[formattedTitle].downloadSpeed = 0;
}
if (!(await video.isMuxed())) {
this.log(
formattedTitle,
{
percentage: 0.99,
message: "Muxing ffmpeg metadata...",
},
true
);
await video.muxffmpegMetadata();
if (settings.plex.enabled) {
const plexApi = await new MyPlexAccount(undefined, undefined, undefined, settings.plex.token).connect();
for (const sectionToUpdate of settings.plex.sectionsToUpdate) {
await (await (await (await (await plexApi.resource(sectionToUpdate.server)).connect()).library()).section(sectionToUpdate.section)).refresh();
}
}
}
if (settings.postProcessingCommand !== "") {
this.log(formattedTitle, { message: `Running post download command "${settings.postProcessingCommand}"...` }, true);
await video.postProcessingCommand().catch((err) => console.log(`An error occurred while executing the postProcessingCommand!\n${err.message}\n`));
// eslint-disable-next-line no-fallthrough
case VideoState.Muxed: {
mpb?.done(fTitle);
setTimeout(() => mpb?.removeTask(fTitle), 10000);
updateSummaryBar();
}
this.log(formattedTitle, { message: `Downloaded!` }, true);

this.mpb?.done(formattedTitle);
setTimeout(() => this.mpb?.removeTask(formattedTitle), 30000);

this.updateSummaryBar();
} catch (error) {
let info;
if (!(error instanceof Error)) info = new Error(`Something weird happened, whatever was thrown was not a error! ${error}`);
else info = error;
// Handle errors when downloading nicely
if (retries < MaxRetries) {
this.log(formattedTitle, { message: `\u001b[31m\u001b[1mERR\u001b[0m: ${info.message} - Retrying in ${retries}s [${retries}/${MaxRetries}]` }, true);

// Wait between retries
await sleep(1000 * (retries + 1));

await this.processVideo(task, ++retries);
} else this.log(formattedTitle, { message: `\u001b[31m\u001b[1mERR\u001b[0m: ${info.message} Max Retries! [${retries}/${MaxRetries}]` }, true);
}
} catch (error) {
let info;
if (!(error instanceof Error)) info = new Error(`Something weird happened, whatever was thrown was not a error! ${error}`);
else info = error;
// Handle errors when downloading nicely
if (retries < MaxRetries) {
log(fTitle, { message: `\u001b[31m\u001b[1mERR\u001b[0m: ${info.message} - Retrying in ${retries}s [${retries}/${MaxRetries}]` });

// Wait between retries
await sleep(1000 * (retries + 1));

await processVideo(video, ++retries);
} else log(fTitle, { message: `\u001b[31m\u001b[1mERR\u001b[0m: ${info.message} Max Retries! [${retries}/${MaxRetries}]` });
}
}
};

0 comments on commit 15210f9

Please sign in to comment.