Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ vi.mock('./page-bulk-export-job-cron', () => {
return {
pageBulkExportJobCronService: {
cleanUpExportJobResources: vi.fn(() => Promise.resolve()),
notifyExportResultAndCleanUp: vi.fn(() => Promise.resolve()),
},
};
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { HydratedDocument } from 'mongoose';

import { SupportedAction } from '~/interfaces/activity';
import type Crowi from '~/server/crowi';
import { configManager } from '~/server/service/config-manager';
import CronService from '~/server/service/cron';
Expand Down Expand Up @@ -57,13 +57,16 @@ class PageBulkExportJobCleanUpCronService extends CronService {
},
});

if (pageBulkExportJobCronService != null) {
await this.cleanUpAndDeleteBulkExportJobs(
expiredExportJobs,
pageBulkExportJobCronService.cleanUpExportJobResources.bind(
pageBulkExportJobCronService,
),
const cleanUp = async (job: PageBulkExportJobDocument) => {
await pageBulkExportJobCronService?.notifyExportResultAndCleanUp(
Copy link
Contributor Author

@arafubeatbox arafubeatbox Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanUpAndDeleteBulkExportJobs ではなく notifyExportResultAndCleanUp を実行し、明示的に期限切れの通知を実行。

SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
job,
);
logger.error(`Bulk export job has expired: ${job._id.toString()}`);
};

if (pageBulkExportJobCronService != null) {
await this.cleanUpAndDeleteBulkExportJobs(expiredExportJobs, cleanUp);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';

export class BulkExportJobExpiredError extends Error {
constructor() {
super('Bulk export job has expired');
constructor(pageBulkExportJob: PageBulkExportJobDocument) {
super(`Bulk export job has expired: ${pageBulkExportJob._id.toString()}`);
}
}

export class BulkExportJobRestartedError extends Error {
export class BulkExportJobStreamDestroyedByCleanupError extends Error {
constructor() {
super('Bulk export job has restarted');
super('Bulk export job stream was destroyed by cleanup');
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import fs from 'node:fs';
import path from 'node:path';
import type { Readable } from 'node:stream';
import type { Readable, Writable } from 'node:stream';
import type { IUser } from '@growi/core';
import { getIdForRef, isPopulated } from '@growi/core';
import mongoose from 'mongoose';
Expand All @@ -26,7 +26,7 @@ import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snaps

import {
BulkExportJobExpiredError,
BulkExportJobRestartedError,
BulkExportJobStreamDestroyedByCleanupError,
} from './errors';
import { requestPdfConverter } from './request-pdf-converter';
import { compressAndUpload } from './steps/compress-and-upload';
Expand All @@ -40,7 +40,10 @@ export interface IPageBulkExportJobCronService {
pageBatchSize: number;
maxPartSize: number;
compressExtension: string;
setStreamInExecution(jobId: ObjectIdLike, stream: Readable): void;
setStreamsInExecution(
jobId: ObjectIdLike,
...streams: (Readable | Writable)[]
): void;
removeStreamInExecution(jobId: ObjectIdLike): void;
handleError(
err: Error | null,
Expand Down Expand Up @@ -78,10 +81,10 @@ class PageBulkExportJobCronService
// temporal path of local fs to output page files before upload
tmpOutputRootDir = '/tmp/page-bulk-export';

// Keep track of the stream executed for PageBulkExportJob to destroy it on job failure.
// The key is the id of a PageBulkExportJob.
// Keep track of all streams executed for PageBulkExportJob to destroy them on job failure.
// The key is the id of a PageBulkExportJob, value is array of streams.
private streamInExecutionMemo: {
[key: string]: Readable;
[key: string]: (Readable | Writable)[];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readable と writable stream 両方をメモリ上に記録して置けるように変更。

} = {};

private parallelExecLimit: number;
Expand Down Expand Up @@ -133,22 +136,27 @@ class PageBulkExportJobCronService
}

/**
* Get the stream in execution for a job.
* Get all streams in execution for a job.
* A getter method that includes "undefined" in the return type
*/
getStreamInExecution(jobId: ObjectIdLike): Readable | undefined {
getStreamsInExecution(
jobId: ObjectIdLike,
): (Readable | Writable)[] | undefined {
return this.streamInExecutionMemo[jobId.toString()];
}

/**
* Set the stream in execution for a job
* Set streams in execution for a job
*/
setStreamInExecution(jobId: ObjectIdLike, stream: Readable) {
this.streamInExecutionMemo[jobId.toString()] = stream;
setStreamsInExecution(
jobId: ObjectIdLike,
...streams: (Readable | Writable)[]
) {
this.streamInExecutionMemo[jobId.toString()] = streams;
}

/**
* Remove the stream in execution for a job
* Remove all streams in execution for a job
*/
removeStreamInExecution(jobId: ObjectIdLike) {
delete this.streamInExecutionMemo[jobId.toString()];
Expand All @@ -161,7 +169,7 @@ class PageBulkExportJobCronService
async proceedBulkExportJob(pageBulkExportJob: PageBulkExportJobDocument) {
try {
if (pageBulkExportJob.restartFlag) {
await this.cleanUpExportJobResources(pageBulkExportJob, true);
await this.cleanUpExportJobResources(pageBulkExportJob);
pageBulkExportJob.restartFlag = false;
pageBulkExportJob.status = PageBulkExportJobStatus.initializing;
pageBulkExportJob.statusOnPreviousCronExec = undefined;
Expand Down Expand Up @@ -226,9 +234,6 @@ class PageBulkExportJobCronService
SupportedAction.ACTION_PAGE_BULK_EXPORT_JOB_EXPIRED,
pageBulkExportJob,
);
} else if (err instanceof BulkExportJobRestartedError) {
logger.info(err.message);
await this.cleanUpExportJobResources(pageBulkExportJob);
} else {
logger.error(err);
await this.notifyExportResultAndCleanUp(
Expand Down Expand Up @@ -269,15 +274,24 @@ class PageBulkExportJobCronService
*/
async cleanUpExportJobResources(
pageBulkExportJob: PageBulkExportJobDocument,
restarted = false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restarted かどうかは stream のエラーを handleError によって処理するときの場合分けに必要だったが、cleanup 時は handleError しなくなったため、不要となった。

) {
const streamInExecution = this.getStreamInExecution(pageBulkExportJob._id);
if (streamInExecution != null) {
if (restarted) {
streamInExecution.destroy(new BulkExportJobRestartedError());
} else {
streamInExecution.destroy(new BulkExportJobExpiredError());
}
const streamsInExecution = this.getStreamsInExecution(
pageBulkExportJob._id,
);
if (streamsInExecution != null && streamsInExecution.length > 0) {
// Wait for all streams to be destroyed before proceeding with cleanup
await Promise.allSettled(
streamsInExecution.map((stream) => {
if (!stream.destroyed) {
return new Promise<void>((resolve) => {
stream.destroy(new BulkExportJobStreamDestroyedByCleanupError());
stream.once('close', () => resolve());
});
}
return Promise.resolve();
}),
);

this.removeStreamInExecution(pageBulkExportJob._id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export async function requestPdfConverter(
}

if (new Date() > bulkExportJobExpirationDate) {
throw new BulkExportJobExpiredError();
throw new BulkExportJobExpiredError(pageBulkExportJob);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export async function compressAndUpload(

pageArchiver.directory(this.getTmpOutputDir(pageBulkExportJob), false);
pageArchiver.finalize();
this.setStreamInExecution(pageBulkExportJob._id, pageArchiver);
this.setStreamsInExecution(pageBulkExportJob._id, pageArchiver);

try {
await fileUploadService.uploadAttachment(pageArchiver, attachment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
import PageBulkExportJob from '../../../models/page-bulk-export-job';
import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
import type { IPageBulkExportJobCronService } from '..';
import { BulkExportJobStreamDestroyedByCleanupError } from '../errors';

async function reuseDuplicateExportIfExists(
this: IPageBulkExportJobCronService,
Expand Down Expand Up @@ -100,9 +101,16 @@ export async function createPageSnapshotsAsync(
},
});

this.setStreamInExecution(pageBulkExportJob._id, pagesReadable);
this.setStreamsInExecution(
pageBulkExportJob._id,
pagesReadable,
pageSnapshotsWritable,
);

pipeline(pagesReadable, pageSnapshotsWritable, (err) => {
this.handleError(err, pageBulkExportJob);
// prevent overlapping cleanup
if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) {
this.handleError(err, pageBulkExportJob);
}
Copy link
Contributor Author

@arafubeatbox arafubeatbox Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

元々 cleanup で投げた BulkExportJobRestartedError, BulkExportJobExpiredError もここでキャッチし、handleError を実行することによって通知と、重複した cleanup 処理を実行していた。
今回 cleanup の destroy で生じたエラーは処理しないように変更。

(重複した cleanup を実行していたのは、destroy によって stream の処理が終わる前に cleanup が完了してしまう場合をカバーするためだった。これをカバーするために cleanup 関数側で stream close を待つように変更 1d807d0)

});
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type { PageBulkExportJobDocument } from '../../../models/page-bulk-export
import type { PageBulkExportPageSnapshotDocument } from '../../../models/page-bulk-export-page-snapshot';
import PageBulkExportPageSnapshot from '../../../models/page-bulk-export-page-snapshot';
import type { IPageBulkExportJobCronService } from '..';
import { BulkExportJobStreamDestroyedByCleanupError } from '../errors';

async function convertMdToHtml(
md: string,
Expand Down Expand Up @@ -132,9 +133,16 @@ export async function exportPagesToFsAsync(

const pagesWritable = await getPageWritable.bind(this)(pageBulkExportJob);

this.setStreamInExecution(pageBulkExportJob._id, pageSnapshotsReadable);
this.setStreamsInExecution(
pageBulkExportJob._id,
pageSnapshotsReadable,
pagesWritable,
);

pipeline(pageSnapshotsReadable, pagesWritable, (err) => {
this.handleError(err, pageBulkExportJob);
// prevent overlapping cleanup
if (!(err instanceof BulkExportJobStreamDestroyedByCleanupError)) {
this.handleError(err, pageBulkExportJob);
}
});
}
Loading