Skip to content

csv-parse stream will process the whole buffer even with back pressure. #408

@dobesv

Description

@dobesv

Describe the bug

Using csv-parse@5.5.2, I have found that if I provide a buffer input or stream to the parser, it will always send every row in a buffer it gets even if there is back-pressure. It will only apply back-pressure between chunks that it receives from its input.

To Reproduce

import assert from 'assert'
import { parse, Parser } from 'csv-parse'
import { pipeline, Readable, Writable, WritableOptions } from 'stream';


// Create the parser
const parser: Parser = parse();
parser.on('data', (row) => {
    console.log('parser row', row, (parser as any)._readableState.length);
})

const bufs = [];
for(let i=0; i < 100000; i++) {
    bufs.push(Buffer.from(`a${i}, b${i}, ${i}\r\n`));
}
const inputBuffer = Buffer.concat(bufs);
const input = Readable.from([inputBuffer]);
input.on('data', (chunk) => {
    console.log('input chunk', chunk.length);
})

class BackpressureWritable extends Writable {
    count: number;
    threshold: number;

    constructor(options: WritableOptions) {
        super(options);
        this.count = 0;
        this.threshold = 10;
    }

    // @ts-ignore
    write(chunk, encoding, callback) {
        const result = super.write(chunk, encoding, callback);
        console.log(`write(${chunk.toString()}) => ${result}`);
        return result;
    }

    _write(chunk: any, encoding: string, callback: any) {
        this.count++;
        console.log(`_write(${chunk.toString()})`);

        setTimeout(callback, this.count); // Simulating delay to handle backpressure
    }
}


const output = new BackpressureWritable({objectMode: true, highWaterMark: 1});
pipeline(input, parser, output, () => {
    console.log('pipeline output');
});

output.on('end', () => {
    console.log('end');
});

If you run the above script you will see that (parser as any)._readableState.length increments to include all rows immediately - all the rows are buffered into the Readable half of the parser.

In some cases a user of the library may want to pass in a buffer of many MB thinking that it will be processed in small batches (say, using the stream water mark). However, with this library all the rows will be processed immediately, using a lot more memory than necessary.

In order to fix this, the library should check the return value of push, and if it is false it should pause parsing even if it has enough data buffered from the input to read another record. I'm not actually sure currently how to know when it OK to call push again, though. The documentation isn't clear on this point.

See also

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions