Skip to content

Commit

Permalink
Issue-423 - Fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
huntharo committed May 22, 2024
1 parent c412f37 commit 6d9d10b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 53 deletions.
178 changes: 126 additions & 52 deletions lib/sitemap-index-stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { WriteStream } from 'fs';
import { Transform, TransformOptions, TransformCallback } from 'stream';
import { IndexItem, SitemapItemLoose, ErrorLevel } from './types';
import { SitemapStream, stylesheetInclude } from './sitemap-stream';
import { element, otag, ctag } from './sitemap-xml';
import { WriteStream } from 'fs';

export enum IndexTagNames {
sitemap = 'sitemap',
Expand Down Expand Up @@ -36,18 +36,22 @@ export class SitemapIndexStream extends Transform {
this.xslUrl = opts.xslUrl;
}

private writeHeadOutput(): void {
this.hasHeadOutput = true;
let stylesheet = '';
if (this.xslUrl) {
stylesheet = stylesheetInclude(this.xslUrl);
}
this.push(xmlDec + stylesheet + sitemapIndexTagStart);
}

_transform(
item: IndexItem | string,
encoding: string,
callback: TransformCallback
): void {
if (!this.hasHeadOutput) {
this.hasHeadOutput = true;
let stylesheet = '';
if (this.xslUrl) {
stylesheet = stylesheetInclude(this.xslUrl);
}
this.push(xmlDec + stylesheet + sitemapIndexTagStart);
this.writeHeadOutput();
}
this.push(otag(IndexTagNames.sitemap));
if (typeof item === 'string') {
Expand All @@ -69,11 +73,29 @@ export class SitemapIndexStream extends Transform {
}

_flush(cb: TransformCallback): void {
if (!this.hasHeadOutput) {
this.writeHeadOutput();
}

this.push(closetag);
cb();
}
}

/**
* Callback for SitemapIndexAndStream that creates a new sitemap stream for a given sitemap index.
*
* Called when a new sitemap file is needed.
*
* The write stream is the destination where the sitemap was piped.
* SitemapAndIndexStream will wait for the `finish` event on each sitemap's
* write stream before moving on to the next sitemap. This ensures that the
* contents of the write stream will be fully written before being used
* by any following operations (e.g. uploading, reading contents for unit tests).
*
* @param i - The index of the sitemap file
* @returns A tuple containing the index item to be written into the sitemap index, the sitemap stream, and the write stream for the sitemap pipe destination
*/
type getSitemapStream = (
i: number
) => [IndexItem | string, SitemapStream, WriteStream];
Expand All @@ -84,70 +106,122 @@ export interface SitemapAndIndexStreamOptions
limit?: number;
getSitemapStream: getSitemapStream;
}
// const defaultSIStreamOpts: SitemapAndIndexStreamOptions = {};

export class SitemapAndIndexStream extends SitemapIndexStream {
private i: number;
private itemsWritten: number;
private getSitemapStream: getSitemapStream;
private currentSitemap: SitemapStream;
private currentSitemapPipeline?: WriteStream;
private idxItem: IndexItem | string;
private currentSitemap?: SitemapStream;
private limit: number;
private currentSitemapPipeline?: WriteStream;

constructor(opts: SitemapAndIndexStreamOptions) {
opts.objectMode = true;
super(opts);
this.i = 0;
this.itemsWritten = 0;
this.getSitemapStream = opts.getSitemapStream;
[this.idxItem, this.currentSitemap, this.currentSitemapPipeline] =
this.getSitemapStream(0);
this.currentSitemap.on('error', (err) => this.emit('error', err));
this.limit = opts.limit ?? 45000;
}

_writeSMI(item: SitemapItemLoose, callback: () => void): void {
this.i++;
if (!this.currentSitemap.write(item)) {
this.currentSitemap.once('drain', callback);
} else {
process.nextTick(callback);
}
}

_transform(
item: SitemapItemLoose,
encoding: string,
callback: TransformCallback
): void {
if (this.i === 0) {
this._writeSMI(item, () =>
super._transform(this.idxItem, encoding, callback)
);
} else if (this.i % this.limit === 0) {
const onFinish = () => {
const [idxItem, currentSitemap, currentSitemapPipeline] =
this.getSitemapStream(this.i / this.limit);
currentSitemap.on('error', (err) => this.emit('error', err));
this.currentSitemap = currentSitemap;
this.currentSitemapPipeline = currentSitemapPipeline;
// push to index stream
this._writeSMI(item, () =>
// push to index stream
super._transform(idxItem, encoding, callback)
);
};
this.currentSitemapPipeline?.on('finish', onFinish);
this.currentSitemap.end(
!this.currentSitemapPipeline ? onFinish : undefined
);
if (this.itemsWritten % this.limit === 0) {
if (this.currentSitemap) {
const onFinish = new Promise<void>((resolve, reject) => {
this.currentSitemap?.on('finish', resolve);
this.currentSitemap?.on('error', reject);
this.currentSitemap?.end();
});

const onPipelineFinish = this.currentSitemapPipeline
? new Promise<void>((resolve, reject) => {
this.currentSitemapPipeline?.on('finish', resolve);
this.currentSitemapPipeline?.on('error', reject);
})
: Promise.resolve();

Promise.all([onFinish, onPipelineFinish])
.then(() => {
this.createSitemap(encoding);
this.writeItem(item, callback);
})
.catch(callback);
return;
} else {
this.createSitemap(encoding);
}
}

this.writeItem(item, callback);
}

private writeItem(item: SitemapItemLoose, callback: TransformCallback): void {
if (!this.currentSitemap) {
callback(new Error('No sitemap stream available'));
return;
}

if (!this.currentSitemap.write(item)) {
this.currentSitemap.once('drain', callback);
} else {
this._writeSMI(item, callback);
process.nextTick(callback);
}

// Increment the count of items written
this.itemsWritten++;
}

/**
* Called when the stream is finished.
* If there is a current sitemap, we wait for it to finish before calling the callback.
*
* @param cb
*/
_flush(cb: TransformCallback): void {
const onFinish = () => super._flush(cb);
this.currentSitemapPipeline?.on('finish', onFinish);
this.currentSitemap.end(
!this.currentSitemapPipeline ? onFinish : undefined
);
const onFinish = new Promise<void>((resolve, reject) => {
if (this.currentSitemap) {
this.currentSitemap.on('finish', resolve);
this.currentSitemap.on('error', reject);
this.currentSitemap.end();
} else {
resolve();
}
});

const onPipelineFinish = new Promise<void>((resolve, reject) => {
if (this.currentSitemapPipeline) {
this.currentSitemapPipeline.on('finish', resolve);
this.currentSitemapPipeline.on('error', reject);
// The pipeline (pipe target) will get it's end() call
// from the sitemap stream ending.
} else {
resolve();
}
});

Promise.all([onFinish, onPipelineFinish])
.then(() => {
super._flush(cb);
})
.catch((err) => {
cb(err);
});
}

private createSitemap(encoding: string): void {
const [idxItem, currentSitemap, currentSitemapPipeline] =
this.getSitemapStream(this.itemsWritten / this.limit);
currentSitemap.on('error', (err: any) => this.emit('error', err));
this.currentSitemap = currentSitemap;
this.currentSitemapPipeline = currentSitemapPipeline;
super._transform(idxItem, encoding, () => {
// We are not too fussed about waiting for the index item to be written
// we we'll wait for the file to finish at the end
// and index file write volume tends to be small in comprarison to sitemap
// writes.
// noop
});
}
}
2 changes: 1 addition & 1 deletion tests/sitemap-index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ describe('sitemapAndIndex', () => {

function writeData(
sms: SitemapStream | SitemapAndIndexStream,
data: any
data
): Promise<void> {
if (!sms.write(data)) {
return new Promise((resolve) => {
Expand Down

0 comments on commit 6d9d10b

Please sign in to comment.