diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.integ.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.integ.ts index 95ed28ee832..ce9d6c7d9a5 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.integ.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.integ.ts @@ -31,6 +31,7 @@ vi.mock('./page-bulk-export-job-cron', () => { return { pageBulkExportJobCronService: { cleanUpExportJobResources: vi.fn(() => Promise.resolve()), + notifyExportResultAndCleanUp: vi.fn(() => Promise.resolve()), }, }; }); diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts index 8f4ffac33f3..29049a3a7ac 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-clean-up-cron.ts @@ -1,5 +1,5 @@ import type { HydratedDocument } from 'mongoose'; - +import { SupportedAction } from '~/interfaces/activity'; import type Crowi from '~/server/crowi'; import { configManager } from '~/server/service/config-manager'; import CronService from '~/server/service/cron'; @@ -57,13 +57,16 @@ class PageBulkExportJobCleanUpCronService extends CronService { }, }); - if (pageBulkExportJobCronService != null) { - await this.cleanUpAndDeleteBulkExportJobs( - expiredExportJobs, - pageBulkExportJobCronService.cleanUpExportJobResources.bind( - pageBulkExportJobCronService, - ), + const cleanUp = async (job: PageBulkExportJobDocument) => { + await pageBulkExportJobCronService?.notifyExportResultAndCleanUp( + SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, + job, ); + logger.error(`Bulk export job has expired: ${job._id.toString()}`); + }; + + if (pageBulkExportJobCronService != null) { + await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, cleanUp); } } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts index 405d55ff86f..1270b34632f 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/errors.ts @@ -1,11 +1,13 @@ +import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job'; + export class BulkExportJobExpiredError extends Error { - constructor() { - super('Bulk export job has expired'); + constructor(pageBulkExportJob: PageBulkExportJobDocument) { + super(`Bulk export job has expired: ${pageBulkExportJob._id.toString()}`); } } -export class BulkExportJobRestartedError extends Error { +export class BulkExportJobStreamDestroyedByCleanupError extends Error { constructor() { - super('Bulk export job has restarted'); + super('Bulk export job stream was destroyed by cleanup'); } } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts index 4db8c8a7ad0..28342737eba 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/index.ts @@ -1,6 +1,6 @@ import fs from 'node:fs'; import path from 'node:path'; -import type { Readable } from 'node:stream'; +import type { Readable, Writable } from 'node:stream'; import type { IUser } from '@growi/core'; import { getIdForRef, isPopulated } from '@growi/core'; import mongoose from 'mongoose'; @@ -26,7 +26,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps import { BulkExportJobExpiredError, - BulkExportJobRestartedError, + BulkExportJobStreamDestroyedByCleanupError, } from './errors'; import { requestPdfConverter } from './request-pdf-converter'; import { compressAndUpload } from './steps/compress-and-upload'; @@ -40,7 +40,10 @@ export interface IPageBulkExportJobCronService { pageBatchSize: number; maxPartSize: number; compressExtension: string; - setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void; + setStreamsInExecution( + jobId: ObjectIdLike, + ...streams: (Readable | Writable)[] + ): void; removeStreamInExecution(jobId: ObjectIdLike): void; handleError( err: Error | null, @@ -78,10 +81,10 @@ class PageBulkExportJobCronService // temporal path of local fs to output page files before upload tmpOutputRootDir = '/tmp/page-bulk-export'; - // Keep track of the stream executed for PageBulkExportJob to destroy it on job failure. - // The key is the id of a PageBulkExportJob. + // Keep track of all streams executed for PageBulkExportJob to destroy them on job failure. + // The key is the id of a PageBulkExportJob, value is array of streams. private streamInExecutionMemo: { - [key: string]: Readable; + [key: string]: (Readable | Writable)[]; } = {}; private parallelExecLimit: number; @@ -133,22 +136,27 @@ class PageBulkExportJobCronService } /** - * Get the stream in execution for a job. + * Get all streams in execution for a job. * A getter method that includes "undefined" in the return type */ - getStreamInExecution(jobId: ObjectIdLike): Readable | undefined { + getStreamsInExecution( + jobId: ObjectIdLike, + ): (Readable | Writable)[] | undefined { return this.streamInExecutionMemo[jobId.toString()]; } /** - * Set the stream in execution for a job + * Set streams in execution for a job */ - setStreamInExecution(jobId: ObjectIdLike, stream: Readable) { - this.streamInExecutionMemo[jobId.toString()] = stream; + setStreamsInExecution( + jobId: ObjectIdLike, + ...streams: (Readable | Writable)[] + ) { + this.streamInExecutionMemo[jobId.toString()] = streams; } /** - * Remove the stream in execution for a job + * Remove all streams in execution for a job */ removeStreamInExecution(jobId: ObjectIdLike) { delete this.streamInExecutionMemo[jobId.toString()]; @@ -161,7 +169,7 @@ class PageBulkExportJobCronService async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) { try { if (pageBulkExportJob.restartFlag) { - await this.cleanUpExportJobResources(pageBulkExportJob, true); + await this.cleanUpExportJobResources(pageBulkExportJob); pageBulkExportJob.restartFlag = false; pageBulkExportJob.status = PageBulkExportJobStatus.initializing; pageBulkExportJob.statusOnPreviousCronExec = undefined; @@ -226,9 +234,6 @@ class PageBulkExportJobCronService SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED, pageBulkExportJob, ); - } else if (err instanceof BulkExportJobRestartedError) { - logger.info(err.message); - await this.cleanUpExportJobResources(pageBulkExportJob); } else { logger.error(err); await this.notifyExportResultAndCleanUp( @@ -269,15 +274,24 @@ class PageBulkExportJobCronService */ async cleanUpExportJobResources( pageBulkExportJob: PageBulkExportJobDocument, - restarted = false, ) { - const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id); - if (streamInExecution != null) { - if (restarted) { - streamInExecution.destroy(new BulkExportJobRestartedError()); - } else { - streamInExecution.destroy(new BulkExportJobExpiredError()); - } + const streamsInExecution = this.getStreamsInExecution( + pageBulkExportJob._id, + ); + if (streamsInExecution != null && streamsInExecution.length > 0) { + // Wait for all streams to be destroyed before proceeding with cleanup + await Promise.allSettled( + streamsInExecution.map((stream) => { + if (!stream.destroyed) { + return new Promise((resolve) => { + stream.destroy(new BulkExportJobStreamDestroyedByCleanupError()); + stream.once('close', () => resolve()); + }); + } + return Promise.resolve(); + }), + ); + this.removeStreamInExecution(pageBulkExportJob._id); } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/request-pdf-converter.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/request-pdf-converter.ts index ac2b8a2bf05..908ec7d4c42 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/request-pdf-converter.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/request-pdf-converter.ts @@ -49,7 +49,7 @@ export async function requestPdfConverter( } if (new Date() > bulkExportJobExpirationDate) { - throw new BulkExportJobExpiredError(); + throw new BulkExportJobExpiredError(pageBulkExportJob); } try { diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts index bccbe8ffebf..57fe8091698 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/compress-and-upload.ts @@ -76,7 +76,7 @@ export async function compressAndUpload( pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false); pageArchiver.finalize(); - this.setStreamInExecution(pageBulkExportJob._id, pageArchiver); + this.setStreamsInExecution(pageBulkExportJob._id, pageArchiver); try { await fileUploadService.uploadAttachment(pageArchiver, attachment); diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts index b2e2d40eb10..4111b29b13b 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/create-page-snapshots-async.ts @@ -11,6 +11,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export import PageBulkExportJob from '../../../models/page-bulk-export-job'; import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot'; import type { IPageBulkExportJobCronService } from '..'; +import { BulkExportJobStreamDestroyedByCleanupError } from '../errors'; async function reuseDuplicateExportIfExists( this: IPageBulkExportJobCronService, @@ -100,9 +101,16 @@ export async function createPageSnapshotsAsync( }, }); - this.setStreamInExecution(pageBulkExportJob._id, pagesReadable); + this.setStreamsInExecution( + pageBulkExportJob._id, + pagesReadable, + pageSnapshotsWritable, + ); pipeline(pagesReadable, pageSnapshotsWritable, (err) => { - this.handleError(err, pageBulkExportJob); + // prevent overlapping cleanup + if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) { + this.handleError(err, pageBulkExportJob); + } }); } diff --git a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts index 00f83fcb8f3..1fa31c15063 100644 --- a/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts +++ b/apps/app/src/features/page-bulk-export/server/service/page-bulk-export-job-cron/steps/export-pages-to-fs-async.ts @@ -20,6 +20,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export import type { PageBulkExportPageSnapshotDocument } from '../../../models/page-bulk-export-page-snapshot'; import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot'; import type { IPageBulkExportJobCronService } from '..'; +import { BulkExportJobStreamDestroyedByCleanupError } from '../errors'; async function convertMdToHtml( md: string, @@ -132,9 +133,16 @@ export async function exportPagesToFsAsync( const pagesWritable = await getPageWritable.bind(this)(pageBulkExportJob); - this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable); + this.setStreamsInExecution( + pageBulkExportJob._id, + pageSnapshotsReadable, + pagesWritable, + ); pipeline(pageSnapshotsReadable, pagesWritable, (err) => { - this.handleError(err, pageBulkExportJob); + // prevent overlapping cleanup + if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) { + this.handleError(err, pageBulkExportJob); + } }); }