Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual paging #89

Closed
wants to merge 4 commits into from
Closed

Manual paging #89

wants to merge 4 commits into from

Conversation

krassif
Copy link

@krassif krassif commented Sep 15, 2015

This does contain code changes as per Jorge comments. Please review and merge.
The Datastax agreement has been accepted as well.

Thanks.

Krassi added 4 commits December 17, 2014 20:12
	in eachRow via fetchSize option spec, resulting in a nextPage method attached to the resultStats in callback(err, resultStats)
	in stream, implicitly via the nodejs stream implementation, or again via fetchSize option spec

fix: according to https://nodejs.org/api/stream.html#stream_readable_push_chunk_encoding:
	"if push() returns false, then we need to stop reading from source", i.e. this.paused = !this.push(this.buffer.shift());
 into manual-paging

Conflicts:
	lib/client.js
@jorgebay jorgebay mentioned this pull request Sep 18, 2015
@jorgebay
Copy link
Contributor

You want to use krassif:manual-paging as local branch right?

Looks good, I will squash the commits.

I will move it to NODEJS-68 dev branch.
We should add at least the following unit tests:

ResultStream:
    It should call _readNext() when buffer length is 0

Client#stream()
    It should set the ResultSet valve after all the rows have been yielded

And some integration tests also.

@jorgebay
Copy link
Contributor

Squashed and merged into NODEJS-68 branch and added 2 integration tests.

We should continue by adding unit tests. If you want to implement them, make sure to use NODEJS-68 as your target branch.

@jorgebay jorgebay closed this Sep 18, 2015
@ivan
Copy link

ivan commented Sep 19, 2015

I tested the NODEJS-68 branch with a million-row client.stream('SELECT * ...'), and I don't think it fixes the lack-of-back-pressure issue. I'm still seeing massive memory usage and with some additional logging, I see that ResultStream's buffer.length climbs to 310700 items.

The resultStream._valve call in Client.prototype.stream doesn't look quite right - it looks like it's happening only in the eachRow error callback? Does that error callback even get a result argument passed in?

@jorgebay
Copy link
Contributor

I've added a new integration test on NODEJS-68 branch a5a51db, that checks that stream buffer is always lower than fetch size.

Is that what you mean @ivan ? If so, could you describe some steps to reproduce it?

@ivan
Copy link

ivan commented Sep 21, 2015

Sure. I filled a Cassandra database with a million rows (~3GB), called client.stream('SELECT * ...'), used a Transform stream to pipe the rows to stdout, and then piped the stdout of that node program into a slow program xz that backpressured node (but not client.stream).

Take a look at the code I mentioned, I don't see how it can possibly backpressure when the _valve call is only in the error callback.

@jorgebay
Copy link
Contributor

jorgebay commented Oct 1, 2015

In the case you describe, you should expect buffering but not directly related to ResultStream.
To pipe to stdout, you will need to use a transform stream in the middle, something like:

resultStream
  .pipe(stringifierStream)
  .pipe(process.stdout);

If that is the case, what is causing the excessive buffering is the stringifier -> stdout reading, as the transform stream is successfully pulling the rows (objs) from the result stream as soon as they are received, causing the result stream to query for more. You should implement throttling in your transform stream.

About the implementation, the eachRow final callback is emitted once all the rows have been yielded.
The valve is set once all the rows from the result page have been pushed and it will be called and reset when the buffer of rows in the stream is empty.
In a timeline:
A- push rows for each page to the stream.
B- once all the rows have been pushed and there are more pages available, set the valve to trigger the query to the next page.
C- if the buffer is empty and the valve is set, call readNext(). Remove readNext(). Repeat A.

@ivan
Copy link

ivan commented Oct 1, 2015

You should implement throttling in your transform stream.

Transform streams throttle automatically, though. That's a key advantage over writing a Readable yourself. Only when stdout is under some watermark does it ask for more data from stringifierStream, which asks for more data from cassandra-driver.

It's true that I might have no idea what's going on with the eachRow callback.

Anyway, I did test 59cd16e, and I saw the ResultStream's buffer.length climb to 310700 items. That should never be happening even if the consumer of the row stream is reading "too fast". cassandra-driver should wait for ResultStream's buffer to drain before doing another query.

Sorry for the lack of clean test case, this is part of a big piece of software.

@ivan
Copy link

ivan commented Oct 1, 2015

Ah, I see that what I mentioned is what

if ( !this.paused && !this.buffer.length && this._readNext ) {
is trying to do, but I'm not sure why it didn't work for me.

@yeiniel
Copy link

yeiniel commented Nov 6, 2015

This works really well. It handle very well ending prematurely the stream. I look forward for the merging of this branch into the master. Until that moment we will need to clone this branch into our product instead of doing npm install.

@jorgebay
Copy link
Contributor

jorgebay commented Nov 6, 2015

@yeiniel thanks for the feedback! #111 is being merged to 3.0 branch and it will be released as part of 3.0.0-rc1.

@bkw
Copy link

bkw commented Nov 20, 2015

I see a similar behaviour as @ivan with 3.0.0-rc1. I pipe a very long SELECT result stream through a transform stream and into a writable stream.

For analysis I changed the writer's _write method to a simple setTimeout(callback, 0), simulating an asynchronous write. After a couple of minutes and about 20k rows, the process runs out of memory with the GC message below.
Removing the timeout and returning callback() directly, makes everything pass.

The table consists of about 500m rows, but the oom happens much earlier, around 20k rows.

<--- Last few GCs --->

  181532 ms: Scavenge 1400.3 (1456.3) -> 1400.3 (1456.3) MB, 4.6 / 0 ms (+ 9.8 ms in 1 steps since last GC) [allocation failure] [incremental marking delaying mark-sweep].
  182300 ms: Mark-sweep 1400.3 (1456.3) -> 1399.2 (1455.3) MB, 768.0 / 0 ms (+ 545.0 ms in 870 steps since start of marking, biggest step 9.8 ms) [last resort gc].
  183062 ms: Mark-sweep 1399.2 (1455.3) -> 1398.8 (1456.3) MB, 762.1 / 0 ms [last resort gc].


<--- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x2465bd037399 <JS Object>
    1: DefineObjectProperty [native v8natives.js:~461] [pc=0x3720a805acc2] (this=0x2465bd0b7909 <JS Global Object>,J=0x2465bd0a0dc9 <a ResultSet with map 0xb3f4fd88681>,V=0x386e7fbe9751 <String[9]: pageState>,G=0x2465bd0a0de1 <a PropertyDescriptor with map 0x32f5d7f2bb91>,Y=0x2465bd0043f1 <true>)
    2: DefineOwnProperty(aka DefineOwnProperty) [native v8natives.js:~630] [pc=0x3720a7f54857] (thi...

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
Aborted (core dumped)

@bkw
Copy link

bkw commented Nov 20, 2015

I condensed my problem into a more concise example, getting rid of the transform stream and basically just piping the stream into a writer that uses setTimeout. See here: https://gist.github.com/bkw/6c8d6d80e2d382ced59d
Any hints what I'm doing wrong?

@jorgebay
Copy link
Contributor

@bkw
I think the problem is that you are using write() which is a public method designed to be called directly.
You should use _write() that is intended for stream implementators, something like:

function MyWritable(options) {
  stream.Writable.call(this, options);
}

util.inherits(MyWritable, stream.Writable);

MyWritable.prototype._write = function (row, e, callback) {
  setTimeout(callback, 0);
};

_write() back pressure is handled by the callback, while for write() is handled by the return value.

@bkw
Copy link

bkw commented Nov 23, 2015

Thanks for commenting, @jorgebay. I'm afraid, that's not it, though. I'm just using the simplified Constructor API for the example, as described here: https://nodejs.org/api/stream.html#stream_simplified_constructor_api

This does not really call write(), but just sets up a writable stream, which internally does use _write().
The real version of the code does implement _write and shows the same behaviour.

ivan added a commit to ludiosarchive/terastash that referenced this pull request Nov 13, 2018
…ed by client.stream's lack of backpressure"

This reverts commit 6da836c.

The NODEJS-68 branch did not fix any problems:
datastax/nodejs-driver#89 (comment)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants