From b27cb64e1ba62f6383924e6ad540aeb22de04a6e Mon Sep 17 00:00:00 2001 From: Inrixia Date: Sun, 7 Apr 2024 13:11:57 +1200 Subject: [PATCH] Refactor to full yeild & break up Video/Downloader/Attachments --- package-lock.json | 45 +-- package.json | 3 +- src/float.ts | 66 ++-- src/lib/Attachment.ts | 124 +++++++ src/lib/Downloader.ts | 153 --------- src/lib/Subscription.ts | 58 ++-- src/lib/Video.ts | 514 ++++++++--------------------- src/lib/VideoBase.ts | 241 ++++++++++++++ src/lib/defaults.ts | 1 - src/lib/helpers/Selector.ts | 7 + src/lib/helpers/Semaphore.ts | 22 ++ src/lib/helpers/fileExists.ts | 11 + src/lib/helpers/updatePlex.ts | 14 + src/lib/logging/ProgressBars.ts | 36 +- src/lib/logging/ProgressConsole.ts | 9 +- src/lib/logging/ProgressLogger.ts | 19 +- src/lib/prompts/downloader.ts | 22 -- src/lib/prompts/settings.ts | 4 +- src/lib/types.ts | 1 - src/quickStart.ts | 4 +- 20 files changed, 642 insertions(+), 712 deletions(-) create mode 100644 src/lib/Attachment.ts delete mode 100644 src/lib/Downloader.ts create mode 100644 src/lib/VideoBase.ts create mode 100644 src/lib/helpers/Selector.ts create mode 100644 src/lib/helpers/Semaphore.ts create mode 100644 src/lib/helpers/fileExists.ts create mode 100644 src/lib/helpers/updatePlex.ts delete mode 100644 src/lib/prompts/downloader.ts diff --git a/package-lock.json b/package-lock.json index 5d98ae7..7c498e8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,17 +1,16 @@ { "name": "floatplane-plex-downloader", - "version": "5.10.2", + "version": "5.12.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "floatplane-plex-downloader", - "version": "5.10.2", + "version": "5.12.2", "dependencies": { "@ctrl/plex": "^1.5.3", "@inrixia/db": "2.0.2", "@inrixia/helpers": "^2.0.10", - "chalk-template": "^1.1.0", "default-import": "^1.1.5", "dotenv": "^16.4.5", "ffbinaries": "^1.1.6", @@ -1411,31 +1410,6 @@ "url": "https://github.com/chalk/chalk?sponsor=1" } }, - "node_modules/chalk-template": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/chalk-template/-/chalk-template-1.1.0.tgz", - "integrity": "sha512-T2VJbcDuZQ0Tb2EWwSotMPJjgpy1/tGee1BTpUNsGZ/qgNjV2t7Mvu+d4600U564nbLesN1x2dPL+xii174Ekg==", - "dependencies": { - "chalk": "^5.2.0" - }, - "engines": { - "node": ">=14.16" - }, - "funding": { - "url": "https://github.com/chalk/chalk-template?sponsor=1" - } - }, - "node_modules/chalk-template/node_modules/chalk": { - "version": "5.2.0", - "resolved": "https://registry.npmjs.org/chalk/-/chalk-5.2.0.tgz", - "integrity": "sha512-ree3Gqw/nazQAPuJJEy+avdl7QfZMcUvmHIKgEZkGL+xOBzRvup5Hxo6LHuMceSxOabuJLJm5Yp/92R9eMmMvA==", - "engines": { - "node": "^12.17.0 || ^14.13 || >=16.0.0" - }, - "funding": { - "url": "https://github.com/chalk/chalk?sponsor=1" - } - }, "node_modules/chownr": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", @@ -5620,21 +5594,6 @@ "supports-color": "^7.1.0" } }, - "chalk-template": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/chalk-template/-/chalk-template-1.1.0.tgz", - "integrity": "sha512-T2VJbcDuZQ0Tb2EWwSotMPJjgpy1/tGee1BTpUNsGZ/qgNjV2t7Mvu+d4600U564nbLesN1x2dPL+xii174Ekg==", - "requires": { - "chalk": "^5.2.0" - }, - "dependencies": { - "chalk": { - "version": "5.2.0", - "resolved": "https://registry.npmjs.org/chalk/-/chalk-5.2.0.tgz", - "integrity": "sha512-ree3Gqw/nazQAPuJJEy+avdl7QfZMcUvmHIKgEZkGL+xOBzRvup5Hxo6LHuMceSxOabuJLJm5Yp/92R9eMmMvA==" - } - } - }, "chownr": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.4.tgz", diff --git a/package.json b/package.json index dd33630..eb3447c 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,6 @@ "@ctrl/plex": "^1.5.3", "@inrixia/db": "2.0.2", "@inrixia/helpers": "^2.0.10", - "chalk-template": "^1.1.0", "default-import": "^1.1.5", "dotenv": "^16.4.5", "ffbinaries": "^1.1.6", @@ -54,4 +53,4 @@ "pkg": "^5.8.1", "typescript": "^5.4.3" } -} \ No newline at end of file +} diff --git a/src/float.ts b/src/float.ts index 4412ddf..a171bc6 100644 --- a/src/float.ts +++ b/src/float.ts @@ -5,65 +5,54 @@ import { fetchFFMPEG } from "./lib/helpers/fetchFFMPEG.js"; import { defaultSettings } from "./lib/defaults.js"; import { loginFloatplane, User } from "./logins.js"; -import { VideoDownloader } from "./lib/Downloader.js"; -import chalk from "chalk-template"; +import chalk from "chalk"; import type { ContentPost } from "floatplane/content"; -import type { Video } from "./lib/Video.js"; import { fetchSubscriptions } from "./subscriptionFetching.js"; import semver from "semver"; const { gt, diff } = semver; -import { promptVideos } from "./lib/prompts/downloader.js"; - // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore Yes, package.json isnt under src, this is fine import pkg from "../package.json" assert { type: "json" }; import { Self } from "floatplane/user"; -async function fetchSubscriptionVideos(): Promise { - // Function that pops items out of seek and destroy until the array is empty - const posts: Promise[] = []; +async function* seekAndDestroy(): AsyncGenerator { while (settings.floatplane.seekAndDestroy.length > 0) { const guid = settings.floatplane.seekAndDestroy.pop(); if (guid === undefined) continue; console.log(chalk`Seek and Destroy: {red ${guid}}`); - posts.push(fApi.content.post(guid)); + yield fApi.content.post(guid); } - - const newVideos: Video[] = []; - for await (const subscription of fetchSubscriptions()) { - await subscription.deleteOldVideos(); - for await (const video of subscription.fetchNewVideos()) newVideos.push(video); - for await (const video of subscription.seekAndDestroy(await Promise.all(posts))) newVideos.push(video); - } - - // If we havent found any new videos, then reset polling size to 5 to avoid excessive api requests. - if (newVideos.length === 0) settings.floatplane.videosToSearch = defaultSettings.floatplane.videosToSearch; - - return newVideos; } -const queueVideo = VideoDownloader.queueVideo.bind(VideoDownloader); - /** * Main function that triggeres everything else in the script */ const downloadNewVideos = async () => { - let subVideos = await fetchSubscriptionVideos(); - if (settings.extras.promptVideos) { - if (args.headless) { - console.log("Cannot prompt for videos in headless mode! Disabling promptVideos..."); - settings.extras.promptVideos = false; - } else { - subVideos = await promptVideos(subVideos); + const userSubs = fetchSubscriptions(); + + for await (const contentPost of seekAndDestroy()) { + for await (const subscription of userSubs) { + if (contentPost.creator.id === subscription.creatorId) { + for await (const video of subscription.seekAndDestroy(contentPost)) video.download(); + } } } - return Promise.all(subVideos.map(queueVideo)).then(() => { - // Enforce search limits after searching once. - settings.floatplane.videosToSearch = defaultSettings.floatplane.videosToSearch; - }); + + for await (const subscription of userSubs) { + await subscription.deleteOldVideos(); + for await (const video of subscription.fetchNewVideos()) video.download(); + } + + // Enforce search limits after searching once. + settings.floatplane.videosToSearch = defaultSettings.floatplane.videosToSearch; + + if (settings.floatplane.waitForNewVideos === true) { + console.log(`Checking for new videos in 5 minutes...`); + setTimeout(downloadNewVideos, 5 * 60 * 1000); + } }; // Fix for docker @@ -115,13 +104,4 @@ process.on("SIGTERM", process.exit); console.log(chalk`Initalized! Running version {cyan ${DownloaderVersion}} instance {magenta ${user!.id}}`); await downloadNewVideos(); - - if (settings.floatplane.waitForNewVideos === true) { - const waitLoop = async () => { - await downloadNewVideos(); - setTimeout(waitLoop, 5 * 60 * 1000); - console.log(`Checking for new videos in 5 minutes...`); - }; - waitLoop(); - } })(); diff --git a/src/lib/Attachment.ts b/src/lib/Attachment.ts new file mode 100644 index 0000000..75ad63b --- /dev/null +++ b/src/lib/Attachment.ts @@ -0,0 +1,124 @@ +import db from "@inrixia/db"; +import { nPad } from "@inrixia/helpers/math"; +import { ValueOfA } from "@inrixia/helpers/ts"; +import { settings } from "./helpers/index.js"; +import sanitize from "sanitize-filename"; + +import { dirname, basename, extname } from "path"; + +import { rename, readdir } from "fs/promises"; + +type AttachmentInfo = { + partialBytes?: number; + muxedBytes?: number; + filePath: string; + releaseDate: number; + videoTitle: string; +}; + +type AttachmentAttributes = { + attachmentId: string; + videoTitle: string; + channelTitle: string; + releaseDate: Date; +}; + +enum Extensions { + Muxed = ".mp4", + Partial = ".partial", + NFO = ".nfo", + Thumbnail = ".png", +} + +export class Attachment implements AttachmentAttributes { + private static readonly AttachmentsDB: Record = db>(`./db/attachments.json`); + public static readonly Extensions = Extensions; + + public readonly attachmentId: string; + public readonly channelTitle: string; + public readonly videoTitle: string; + public readonly releaseDate: Date; + + public readonly filePath: string; + public readonly folderPath: string; + + public readonly artworkPath: string; + public readonly nfoPath: string; + public readonly partialPath: string; + public readonly muxedPath: string; + + constructor({ attachmentId, channelTitle, videoTitle, releaseDate }: AttachmentAttributes) { + this.attachmentId = attachmentId; + this.channelTitle = channelTitle; + this.releaseDate = releaseDate; + this.videoTitle = videoTitle; + + this.filePath = this.formatFilePath(settings.filePathFormatting) + .split("/") + .map((pathPart) => (pathPart.startsWith(".") ? pathPart : sanitize(pathPart))) + .join("/"); + + // Ensure filePath is not exceeding maximum length + if (this.filePath.length > 250) this.filePath = this.filePath.substring(0, 250); + + this.folderPath = this.filePath.substring(0, this.filePath.lastIndexOf("/")); + + this.artworkPath = `${this.filePath}${settings.artworkSuffix}`; + this.nfoPath = `${this.filePath}${Extensions.NFO}`; + this.partialPath = `${this.filePath}${Extensions.Partial}`; + this.muxedPath = `${this.filePath}${Extensions.Muxed}`; + + const attachmentInfo = (Attachment.AttachmentsDB[this.attachmentId] ??= { + releaseDate: this.releaseDate.getTime(), + filePath: this.filePath, + videoTitle: this.videoTitle, + }); + // If the attachment existed on another path then move it. + if (attachmentInfo.filePath !== this.filePath) { + rename(this.artworkPath.replace(this.filePath, attachmentInfo.filePath), this.artworkPath).catch(() => null); + rename(this.partialPath.replace(this.filePath, attachmentInfo.filePath), this.partialPath).catch(() => null); + rename(this.muxedPath.replace(this.filePath, attachmentInfo.filePath), this.muxedPath).catch(() => null); + rename(this.nfoPath.replace(this.filePath, attachmentInfo.filePath), this.nfoPath).catch(() => null); + attachmentInfo.filePath = this.filePath; + } + if (attachmentInfo.videoTitle !== this.videoTitle) attachmentInfo.videoTitle = this.videoTitle; + } + + public static find(filter: (video: AttachmentInfo) => boolean) { + return Object.values(this.AttachmentsDB).filter(filter); + } + public attachmentInfo(): AttachmentInfo { + return Attachment.AttachmentsDB[this.attachmentId]; + } + + public static FilePathOptions = ["%channelTitle%", "%year%", "%month%", "%day%", "%hour%", "%minute%", "%second%", "%videoTitle%"] as const; + protected formatFilePath(string: string): string { + const formatLookup: Record, string> = { + "%channelTitle%": this.channelTitle, + "%year%": this.releaseDate.getFullYear().toString(), + "%month%": nPad(this.releaseDate.getMonth() + 1), + "%day%": nPad(this.releaseDate.getDate()), + "%hour%": nPad(this.releaseDate.getHours()), + "%minute%": nPad(this.releaseDate.getMinutes()), + "%second%": nPad(this.releaseDate.getSeconds()), + "%videoTitle%": this.videoTitle.replace(/ - /g, " ").replace(/\//g, " ").replace(/\\/g, " "), + }; + + for (const [match, value] of Object.entries(formatLookup)) { + string = string.replace(new RegExp(match, "g"), value); + } + return string; + } + + public async artworkFileExtension() { + const fileDir = dirname(this.artworkPath); + const fileName = basename(this.artworkPath); + + const filesInDir = await readdir(fileDir); + const matchingFile = filesInDir.find( + (file) => file.startsWith(fileName) && !file.endsWith(Extensions.NFO) && !file.endsWith(Extensions.Partial) && !file.endsWith(Extensions.Muxed), + ); + if (matchingFile) return extname(matchingFile); + return undefined; + } +} diff --git a/src/lib/Downloader.ts b/src/lib/Downloader.ts deleted file mode 100644 index cf9d673..0000000 --- a/src/lib/Downloader.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { Counter, Gauge } from "prom-client"; -import { Video } from "./Video.js"; -import type { Progress } from "got"; - -import { settings, args } from "./helpers/index.js"; -import { MyPlexAccount } from "@ctrl/plex"; - -import { ProgressHeadless } from "./logging/ProgressConsole.js"; -import { ProgressBars } from "./logging/ProgressBars.js"; - -import { promisify } from "util"; -const sleep = promisify(setTimeout); - -const promQueued = new Gauge({ - name: "queued", - help: "Videos waiting to download", -}); -const promErrors = new Counter({ - name: "errors", - help: "Video errors", - labelNames: ["message"], -}); -const promDownloadedTotal = new Counter({ - name: "downloaded_total", - help: "Videos downloaded", -}); - -export class VideoDownloader { - private static readonly MaxRetries = 5; - private static readonly DownloadThreads = 8; - - // The number of available slots for making delivery requests, - // limiting the rate of requests to avoid exceeding the API rate limit. - private static AvalibleDeliverySlots = this.DownloadThreads; - private static readonly DownloadQueue: (() => void)[] = []; - - private static readonly ProgressLogger = args.headless ? ProgressHeadless : ProgressBars; - private static async getDownloadSempahore() { - // If there is an available request slot, proceed immediately - if (this.AvalibleDeliverySlots > 0) return this.AvalibleDeliverySlots--; - - // Otherwise, wait for a request slot to become available - return new Promise((r) => this.DownloadQueue.push(() => r(this.AvalibleDeliverySlots--))); - } - - private static releaseDownloadSemaphore() { - this.AvalibleDeliverySlots++; - - // If there are queued requests, resolve the first one in the queue - this.DownloadQueue.shift()?.(); - } - - public static async queueVideo(video: Video) { - this.ProgressLogger.TotalVideos++; - promQueued.inc(); - await this.getDownloadSempahore(); - await this.processVideo(video); - await this.releaseDownloadSemaphore(); - promQueued.dec(); - } - - private static async processVideo(video: Video) { - const logger = new this.ProgressLogger(video.title); - - for (let retries = 1; retries < this.MaxRetries + 1; retries++) { - try { - if (settings.extras.saveNfo) { - logger.log("Saving .nfo"); - await video.saveNfo(); - } - if (settings.extras.downloadArtwork) { - logger.log("Saving artwork"); - await video.downloadArtwork(); - } - - switch (await video.getState()) { - case Video.State.Missing: { - logger.log("Waiting on delivery cdn..."); - - const downloadRequest = await video.download(settings.floatplane.videoResolution); - - let downloadInterval: NodeJS.Timeout; - downloadRequest.once("downloadProgress", (downloadProgress: Progress) => { - logger.log("Starting download..."); - downloadInterval = setInterval(() => logger.onDownloadProgress(downloadRequest.downloadProgress), 125); - logger.onDownloadProgress(downloadProgress); - }); - - await new Promise((res, rej) => { - downloadRequest.once("end", res); - downloadRequest.once("error", rej); - }).finally(() => { - clearInterval(downloadInterval); - logger.onDownloadProgress(downloadRequest.downloadProgress); - logger.log("Finished download..."); - }); - } - // eslint-disable-next-line no-fallthrough - case Video.State.Partial: { - logger.log("Muxing ffmpeg metadata..."); - await video.muxffmpegMetadata(); - - if (settings.postProcessingCommand !== "") { - logger.log(`Running post download command "${settings.postProcessingCommand}"...`); - await video.postProcessingCommand().catch((err) => logger.log(`postProcessingCommand failed! ${err.message}\n`)); - } - - if (settings.plex.enabled) { - await this.updatePlex().catch((err) => { - throw new Error(`Updating plex failed! ${err.message}`); - }); - } - } - // eslint-disable-next-line no-fallthrough - case Video.State.Muxed: { - this.ProgressLogger.CompletedVideos++; - logger.done("Download & Muxing complete!"); - promDownloadedTotal.inc(); - } - } - } catch (error) { - let message = error instanceof Error ? error.message : `Something weird happened, whatever was thrown was not a error! ${error}`; - if (message.includes("ffmpeg")) { - const lastIndex = message.lastIndexOf(".partial"); - if (lastIndex !== -1) { - message = `ffmpeg${message.substring(lastIndex + 9).replace(/\n|\r/g, "")}`; - } - } - promErrors.labels({ message: message }).inc(); - - if (retries < this.MaxRetries) { - logger.error(`${message} - Retrying in ${retries}s [${retries}/${this.MaxRetries}]`); - // Wait between retries - await sleep(1000 * retries); - } else { - logger.error(`${message} - Max Retries! [${retries}/${this.MaxRetries}]`, true); - } - } - } - } - - private static plexApi: MyPlexAccount; - private static async updatePlex() { - if (this.plexApi === undefined) this.plexApi = await new MyPlexAccount(undefined, undefined, undefined, settings.plex.token).connect(); - for (const sectionToUpdate of settings.plex.sectionsToUpdate) { - const resource = await this.plexApi.resource(sectionToUpdate.server); - const server = await resource.connect(); - const library = await server.library(); - const section = await library.section(sectionToUpdate.section); - await section.refresh(); - } - } -} diff --git a/src/lib/Subscription.ts b/src/lib/Subscription.ts index 7a5a838..f7ff4e3 100644 --- a/src/lib/Subscription.ts +++ b/src/lib/Subscription.ts @@ -7,10 +7,11 @@ import type { ChannelOptions, SubscriptionSettings } from "./types.js"; import type { ContentPost, VideoContent } from "floatplane/content"; import type { BlogPost } from "floatplane/creator"; -import { Video } from "./Video.js"; +import { VideoBase } from "./VideoBase.js"; import { settings } from "./helpers/index.js"; import { ItemCache } from "./Caches.js"; +import { Video } from "./Video.js"; const removeRepeatedSentences = (postTitle: string, attachmentTitle: string) => { const separators = /(?:\s+|^)((?:[^.,;:!?-]+[\s]*[.,;:!?-]+)+)(?:\s+|$)/g; @@ -63,7 +64,7 @@ export default class Subscription { let deletedFiles = 0; let deletedVideos = 0; - for (const video of Video.GetChannelVideos((video) => video.releaseDate < ignoreBeforeTimestamp && video.channelTitle === channel.title)) { + for (const video of VideoBase.find((video) => video.releaseDate < ignoreBeforeTimestamp && video.videoTitle === channel.title)) { deletedVideos++; const deletionResults = await Promise.allSettled([ rm(`${video.filePath}.mp4`), @@ -134,7 +135,14 @@ export default class Subscription { videoTitle = videoTitle.trim(); } - yield new Video(post, attachmentId, channel.title, videoTitle, dateOffset * 1000); + yield Video.getOrCreate({ + attachmentId, + description: post.text, + artworkUrl: post.thumbnail?.path, + channelTitle: channel.title, + videoTitle, + releaseDate: new Date(new Date(blogPost.releaseDate).getTime() + dateOffset * 1000), + }); break; } } @@ -146,7 +154,7 @@ export default class Subscription { console.log(chalk`Searching for new videos in {yellow ${this.plan}}`); for await (const blogPost of Subscription.PostIterable(this.creatorId, { hasVideo: true })) { for await (const video of this.matchChannel(blogPost)) { - if ((await video.getState()) !== Video.State.Muxed) yield video; + yield video; } // Stop searching if we have looked through videosToSearch @@ -154,29 +162,25 @@ export default class Subscription { } } - public async *seekAndDestroy(contentPosts: ContentPost[]): AsyncGenerator