-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plumb together Copy Redshift #2606
Conversation
} | ||
|
||
@Override | ||
public JsonNode toJdbcConfig(JsonNode redshiftConfig) { | ||
final String schema = Optional.ofNullable(redshiftConfig.get("schema")).map(JsonNode::asText).orElse("public"); | ||
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cgardens just a skeleton - does this seem like a reasonable approach to support both destinations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. this seems reasonable to me for keeping a clear separation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just dependency inject the 2 different destinations to make testing easier? then this outer RedshiftDestination is essentially just a decorator.
public RedshiftDestination() {
this(new RedshiftInsertDestination(), new RedshiftCopyDestination());
}
public RedshiftDestination(Destination insertDestination, Destination copyDestination) {
...etc
}
@cgardens want to take another pass? No test yet, and still some possible code clean up - but this is working! |
@@ -45,6 +45,34 @@ | |||
"description": "Whether or not to normalize the data in the destination. See <a href=\"https://docs.airbyte.io/architecture/basic-normalization\">basic normalization</a> for more details.", | |||
"title": "Basic Normalization", | |||
"examples": [true, false] | |||
}, | |||
"access_key_id": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was running into some strange json schema stuff so put that on pause now - will relook tomorrow
@@ -45,6 +45,54 @@ | |||
"description": "Whether or not to normalize the data in the destination. See <a href=\"https://docs.airbyte.io/architecture/basic-normalization\">basic normalization</a> for more details.", | |||
"title": "Basic Normalization", | |||
"examples": [true, false] | |||
}, | |||
"s3_bucket_name": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent 2 hours or so running into some frontend weirdness with spec.json rendering. I decided to cut my losses and go with this for now. What we lose is 1 additional authentication method (using ARN roles) + some JSON niceties. Not a big deal some Singer doesn't support the arn role too and I've compensated for this with a more descriptive JSON schema object. I feel it's more valuable to release what we have now and bolt the rest of this on later. I'm also ready to stop working on Redshift :P Does that sound sane?
Any bad input is caught by the https://github.com/airbytehq/airbyte/pull/2606/files#diff-719915fe360d60f49387522687fbde9a7ae807bc837c319edb242b977359e417R202 for the time being. Will write a ticket describing what I was trying to do and what did/did not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL @cgardens @sherifnada. This is 90% there. The only thing left is 1/2 test cases on the |
airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json
Show resolved
Hide resolved
"secret_access_key": { | ||
"type": "string", | ||
"description": "The corresponding secret to the above access key id.", | ||
"title": "S3 Access Key" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"title": "S3 Access Key" | |
"title": "S3 Access Key", | |
"airbyte_secret": true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
...hift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java
Outdated
Show resolved
Hide resolved
super(DRIVER_CLASS, new RedshiftSQLNameTransformer(), new RedshiftSqlOperations()); | ||
@Override | ||
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { | ||
if (hasCopyConfigs(config)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to DRY this if block into a method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with this and the other approach I came up with was DRY but also somewhat complicated and I prefer this form. I'm going to merge this in first to close out the ticket. Happy to iterate on this with you outside. Do you have a particular structure in mind?
...hift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java
Outdated
Show resolved
Hide resolved
...hift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestination.java
Show resolved
Hide resolved
this.secretAccessKey = config.get("secret_access_key").asText(); | ||
} | ||
|
||
public static boolean isPresent(JsonNode config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not move this check to the constructor and throw an IllegalArgumentException or some other checked exception? this way we don't have to remember to call isPresent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function does validation and is also used by the RedshiftDestination
to decide which underlying strategy to use.
moving this to the constructor works, however it feels strange to be switching on an exception instead of an explicit check. I prefer this form. again, will merge in for now and iterate on your outside if you feel strongly.
/test connector=connectors/desintation-redshift
|
/test connector=connectors/destination-redshift
|
/test connector=connectors/destination-redshift
|
/publish connector=connectors/destination-redshift
|
What
Describe what the change is solving
Follow up to #2547.
Plumb everything together to allow a user to, via configuration, pick either the less efficient and easier to set up INSERT strategy, or the more efficient and slightly more set up COPY strategy.
Tests gives us the following results:
Effectiveness of COPY is obvious with larger and larger number of rows for a rough 3 - 4 x improvement. Logs show we are now effectively read constrained.
How
Modify the JSON spec to take additional S3 configuration. The
RedshiftDestination
is split into theInsertDestination
and theCopyDestination
. TheRedshiftDestination
picks theCopyDestination
if this S3 configuration exists, falling back to theInsertDestination
otherwise.The original
RedshiftDestination
is now theRedshiftInsertDestination
class - all that previous functionality is identical.Pre-merge Checklist
Recommended reading order
RedshiftDestination
- to understand how the underlying destination is picked.RedshiftCopyDestination
- to understand how the new copy strategy functions.