-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snowflake s3 copy & redshift s3 refactor #2921
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I followed what's going on here. I think you're adding some solid abstractions to keep the file upload approach lean for future dbs.
I'm requesting changes because there a few places where I think we can improve the clarity of how things are working. The main things are:
- the responsibilities / lifecycle of the copier--in other words all of the steps that are involved
- understanding the delegate concept as used in the copier.
if you plan to address all of these things in the refactor you're working on, just lmk and i can approve this one and we can figure out the clarity stuff in the next PR. i definitely want to take another look when the clarity stuff is addressed. that all being said, i think the fundamental approach here is spot on.
...ors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/Copier.java
Outdated
Show resolved
Hide resolved
...ion-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java
Outdated
Show resolved
Hide resolved
...ination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopierSupplier.java
Outdated
Show resolved
Hide resolved
...nation-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java
Outdated
Show resolved
Hide resolved
...nation-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java
Show resolved
Hide resolved
...nation-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java
Outdated
Show resolved
Hide resolved
...estination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Copier.java
Outdated
Show resolved
Hide resolved
...estination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Copier.java
Outdated
Show resolved
Hide resolved
...ination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopierSupplier.java
Outdated
Show resolved
Hide resolved
...src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopierSupplierDelegate.java
Outdated
Show resolved
Hide resolved
/test connector=connectors/destination-snowflake
|
/test connector=connectors/destination-snowflake
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! much clearer now.
I had a few more comments about structure but I don't want to block. Happy to talk about them more if they are interesting.
...nation-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java
Outdated
Show resolved
Hide resolved
* | ||
* @return the SQL queries necessary to merge or the empty string if there was a failure | ||
*/ | ||
String copyToTmpTableAndPrepMergeToFinalTable(boolean hasFailed) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm still not in love this method. would it make sense to split it up? looking at the impl it kinda looks like that's how you think about it anway.
closeWriter
/closerConsumer
copyDataToTmpTable
generateMergeStatement
dunno. i might be off course here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a developer perspective I really dont know that I should do with the function.
Also because it has so many responsibilities that can fail, all the error handling will have to be done in every implementation instead of in the consumer of that interface.
.../src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java
Show resolved
Hide resolved
.../src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java
Outdated
Show resolved
Hide resolved
.../src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java
Show resolved
Hide resolved
import io.airbyte.protocol.models.DestinationSyncMode; | ||
import java.sql.SQLException; | ||
|
||
public class SnowflakeS3StreamCopier extends S3StreamCopier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe purely a style thing, but what about just doing a CopyFromStorageToTable
iface and use that for this part as opposed to extending S3StreamCopier
? Similar to JdbcStreamingQueryConfiguration
. just a thought.
@@ -9,6 +9,10 @@ application { | |||
} | |||
|
|||
dependencies { | |||
implementation 'org.apache.commons:commons-lang3:3.11' | |||
implementation 'org.apache.commons:commons-csv:1.4' | |||
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How reliable is that library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a pretty lightweight library and is regularly updated: https://github.com/alexmojaki/s3-stream-upload
However, there aren't many public usages: https://github.com/search?q=alexmojaki+%22s3-stream-upload%22&type=code
I only used this since it was already part of @davinchia's Redshift implementation. I imagine if we do run into problems with it we can rip it out easily, so I'm not planning on changing it to something internal preemptively.
...stination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumer.java
Show resolved
Hide resolved
* | ||
* @return the SQL queries necessary to merge or the empty string if there was a failure | ||
*/ | ||
String copyToTmpTableAndPrepMergeToFinalTable(boolean hasFailed) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a developer perspective I really dont know that I should do with the function.
Also because it has so many responsibilities that can fail, all the error handling will have to be done in every implementation instead of in the consumer of that interface.
...n-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/SwitchingDestination.java
Show resolved
Hide resolved
...ke/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationType.java
Outdated
Show resolved
Hide resolved
I changed the interface to move all of the control flow to
Which I think reads a lot better. |
/test connector=connectors/destination-snowflake
|
/test connector=connectors/destination-redshift
|
/test connector=connectors/destination-snowflake
|
/test connector=connectors/destination-redshift
|
/publish connector=connectors/destination-snowflake
|
/publish connector=connectors/destination-redshift
|
/publish connector=connectors/destination-snowflake
|
/publish connector=connectors/destination-redshift
|
/publish connector=connectors/destination-snowflake
|
This PR provides the ability to create streaming writes to a file and manage the issuance of copy commands for the destination.
It introduces the concept of a
SwitchingDestination
and has tools to reuse code for copying files to a specific staging environment (such as S3) that can be reused across destinations.Recommended reading order:
SnowflakeDestination
Remaining: