Skip to content

Commit

Permalink
Merge pull request #388 from ekalinin/respect-backpressure
Browse files Browse the repository at this point in the history
respect backpressure
  • Loading branch information
derduher committed Jan 30, 2022
2 parents dad6992 + 0391b30 commit 815680c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 17 deletions.
22 changes: 14 additions & 8 deletions lib/sitemap-index-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ export class SitemapAndIndexStream extends SitemapIndexStream {
this.limit = opts.limit ?? 45000;
}

_writeSMI(item: SitemapItemLoose): void {
this.currentSitemap.write(item);
_writeSMI(item: SitemapItemLoose, callback: () => void): void {
this.i++;
if (!this.currentSitemap.write(item)) {
this.currentSitemap.once('drain', callback);
} else {
process.nextTick(callback);
}
}

_transform(
Expand All @@ -113,25 +117,27 @@ export class SitemapAndIndexStream extends SitemapIndexStream {
callback: TransformCallback
): void {
if (this.i === 0) {
this._writeSMI(item);
super._transform(this.idxItem, encoding, callback);
this._writeSMI(item, () =>
super._transform(this.idxItem, encoding, callback)
);
} else if (this.i % this.limit === 0) {
const onFinish = () => {
const [idxItem, currentSitemap, currentSitemapPipeline] =
this.getSitemapStream(this.i / this.limit);
this.currentSitemap = currentSitemap;
this.currentSitemapPipeline = currentSitemapPipeline;
this._writeSMI(item);
// push to index stream
super._transform(idxItem, encoding, callback);
this._writeSMI(item, () =>
// push to index stream
super._transform(idxItem, encoding, callback)
);
};
this.currentSitemapPipeline?.on('finish', onFinish);
this.currentSitemap.end(
!this.currentSitemapPipeline ? onFinish : undefined
);
} else {
this._writeSMI(item);
callback();
this._writeSMI(item, callback);
}
}

Expand Down
9 changes: 7 additions & 2 deletions lib/sitemap-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,17 @@ export class XMLToSitemapItemStream extends Transform {
callback: TransformCallback
): void {
try {
const cb = () =>
callback(this.level === ErrorLevel.THROW ? this.error : null);
// correcting the type here can be done without making it a breaking change
// TODO fix this
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
this.saxStream.write(data, encoding);
callback(this.level === ErrorLevel.THROW ? this.error : null);
if (!this.saxStream.write(data, encoding)) {
this.saxStream.once('drain', cb);
} else {
process.nextTick(cb);
}
} catch (error) {
callback(error as Error);
}
Expand Down
19 changes: 12 additions & 7 deletions lib/sitemap-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,19 @@ export class SitemapStream extends Transform {
this.hasHeadOutput = true;
this.push(getURLSetNs(this.xmlNS, this.xslUrl));
}
this.smiStream.write(
validateSMIOptions(
normalizeURL(item, this.hostname, this.lastmodDateOnly),
this.level,
this.errorHandler
if (
!this.smiStream.write(
validateSMIOptions(
normalizeURL(item, this.hostname, this.lastmodDateOnly),
this.level,
this.errorHandler
)
)
);
callback();
) {
this.smiStream.once('drain', callback);
} else {
process.nextTick(callback);
}
}

_flush(cb: TransformCallback): void {
Expand Down
1 change: 1 addition & 0 deletions tests/sitemap-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ describe('sitemap stream', () => {
sms.write(source[0]);
sms.write(source[1]);
sms.end();
await new Promise((resolve) => sms.on('finish', resolve));
expect(errorHandlerMock.mock.calls.length).toBe(1);
expect((await streamToPromise(sms)).toString()).toBe(
preamble +
Expand Down

0 comments on commit 815680c

Please sign in to comment.