-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix JdbcSource OOM #1655
Fix JdbcSource OOM #1655
Conversation
@@ -4,7 +4,7 @@ | |||
"$schema": "http://json-schema.org/draft-07/schema#", | |||
"title": "Postgres Source Spec", | |||
"type": "object", | |||
"required": ["host", "password", "port", "database", "username"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
password isn't a required field in postgres. furthermore in practice, this makes it inconvenient to use most local databases.
@@ -89,8 +90,7 @@ public void execute(CheckedConsumer<Connection, SQLException> query) throws SQLE | |||
public <T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator, | |||
CheckedFunction<ResultSet, T, SQLException> recordTransform) | |||
throws SQLException { | |||
// make it lazy. | |||
return Stream.of(1).flatMap(i -> queryInternal(statementCreator, recordTransform)); | |||
return queryInternal(statementCreator, recordTransform); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need queryInternal
in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! good finding!
closes #1582
What
java
flatMap
when converted into anIterator
ends up pulling all of the elements for any one execution into memory. e.g. for this code snippet:1,2,3 are all copied into a buffer before the iterator can ever call
.next()
. Obviously in this simple example where the the contents of the stream are already in memory that is no problem. If the internal collection is infinite (or really big), it means the data isn't processed in a streaming fashioned after the iterator. instead it is collected beforenext()
is called, causing an OOM before the data can be processed / output safely.This caused problems in our implementation of JdbcSource because we used:
to make the querying behavior lazy.
How
In order to get a reliable fix out, I am simply removing the lazy behavior in the
StreamingJdbcDatabase
. Because the query is already chunked, we do not gain much by executing it lazily. While we will eagerly pull the first chunk, subsequent chunks will not get pulled until the stream requests them. Thus there is no OOM risk and the eager behavior while unnecessary poses no danger. I can reproduce this issue using the JdbcStressTest (after adding incremental to it); it fails 100% of the time. Removing theflatMap
fixes the issue and the test passes.Added incremental to the stress test. Admittedly it is done in a less than ideal way, but in order to fix it, we would need to restructure the inheritance of those classes (so that we only wrote the seed data once) and it's not super worthwhile to do right now given other priorities.
Takeaways
flatMap
ped must fit entirely in memory.flatMap
which isn't a priority since we have a fine workaround.Pre-merge Checklist
Recommended reading order
airbyte-db/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java