-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cassandra: check isFullyFetched
before completion
#1935
Conversation
…rom the result set. I observed this issue where fetchSize was set relatively low and the downstream consumer was fast.
f215eb0
to
491897d
Compare
isFullyFetched
before completion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you come up with a test case that shows the broken and now fixed behaviour?
@ennru I can try, but this was already hard to reproduce. Are there any existing tests with cassandra already? |
Yes, they use Cassandra via Docker compose. |
@ennru Where can I find them? There's nothing in https://github.com/akka/alpakka/tree/master/cassandra/src/test |
@ennru Tnx. For some reason I assumed the purpose of that was more or less just for the docs and haven't even considered it a working spec 🤷♂️. Well, we all know where assumption leads 😂. I'll try to add a test case for this although I suspect it will be at best very flaky. |
I finally got some time to work on this today and I was not able to reproduce this locally with a spec. The conditions under which I originally encountered the issue were reading ~10k rows containing quite large jsons (around 150 mb for the whole dataset). I'm not exactly sure what exactly led to the buffer being exhausted prematurely. There is however an exiting test, that generally ensures this doesn't break any functionality:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
// fetch next page | ||
val gFut = rs.fetchMoreResults() | ||
val exec = materializer.executionContext | ||
GuavaFutures.invokeTryCallback(gFut, exec)(futFetchedCallback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be using an async call back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks OK to me, it is only called within synchronous code.
tryPushAfterFetch
is called via the async handler futFetchedCallback
when the future returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I just refactored it out into a method to not duplicate code.
Thank you for this fix. |
Purpose
Under some conditions, the source completed before all results were loaded from the result set.
Now it only completes when rs is fully fetched and all elements were emitted. Before it was completing in case where the current fetched buffer was empty.
References
References #1934
Changes
rs.isFullyFetched
Background Context
I observed this issue where fetchSize was set relatively low and the
downstream consumer was fast.