-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Workflow to handle operations (custom transformation) #3379
Conversation
airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java
Outdated
Show resolved
Hide resolved
/test connector=connectors/destination-csv
|
/test connector=connectors/destination-local-json
|
/test connector=connectors/destination-meilisearch
|
/test connector=connectors/destination-mysql
|
/test connector=connectors/destination-snowflake
|
/test connector=connectors/destination-bigquery
|
/test connector=connectors/destination-postgres
|
/test connector=connectors/destination-redshift
|
* settings and then run the custom transformation command. | ||
*/ | ||
public boolean run(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception { | ||
if (!normalizationRunner.configureDbt(jobId, attempt, jobRoot, config, dbtConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless we think the DbtTransformationRunner will rely on NormalizationRunner in the future, I feel pretty strongly that we should try and pull the shared configuration function out instead of injecting the NormalizationRunner into the DbtTransformationRunner. We would be able to get rid of the implicit relationship we now have.
Let me know if I'm missing something!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we don't have any control on the content (what software is installed and where? does it have git installed? does it have a python env? which version?) of the docker image for the dbt transformations (as they are specified by the user), I am using the normalization docker image where we install our own python scripts (with its correct dependencies).
So I can use these scripts to generate valid dbt profiles.yml files and pull code from the git repository to build the workspace to run the user's docker image from. (the same way we do it for running normalization)
All of this is highly dependent on how we build the normalization docker image, so I think it makes sense to leave it in the Normalization Runner.
In the DbtTransformationRunner, I don't know how to map the destination docker image to the correct integration type, which normalization docker image to use and how it is supposed to work?
All of those are resolved in the NormalizationRunner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense! I didn't understand everything when I reviewed this. your decision makes sense. thanks for explaining.
@@ -62,16 +63,30 @@ public DefaultNormalizationRunner(final DestinationType destinationType, final P | |||
this.pbf = pbf; | |||
} | |||
|
|||
@Override | |||
public boolean configureDbt(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thoughts on this.
* | ||
* @param config destination json config object | ||
* @return the value of the basic normalization key | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is for backward compatiblity and should go away once Subodh makes his change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it should be done for #3328
* @throws Exception - any exception thrown from configuration will be handled gracefully by the | ||
* caller. | ||
*/ | ||
boolean configureDbt(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thoughts on this moving to the DbtTransformationRunner.
} | ||
} | ||
} else { | ||
// TODO chris: Normalization operations should be defined at connection level in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessBuilderFactory.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
One thing I want to resolve before approving is the question of pulling the normalisation runner out of the dbt transformation runner. All my other comments aren't blocking since there are minor questions for my understanding/readability.
There were alot of files, so I was focused on general understanding and organisation. Let me know if there are areas you would like more feedback on.
It'll be nice to have a guide of what files to read in the future (especially since this was a big change) :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Feel free to merge whenever.
@@ -86,7 +87,7 @@ public ProcessBuilder create(String jobId, int attempt, final Path jobRoot, fina | |||
"--restart=Never", | |||
"--overrides=" + overrides, // fails if you add quotes around the overrides string | |||
podName); | |||
|
|||
// TODO handle entrypoint override (to run DbtTransformationRunner for example) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll likely need to override cmd
and args
in the pod's spec in kube_runner_template.yaml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened this issue for now for kube version of this: #3441
@@ -2410,6 +2410,7 @@ <h3 class="field-label">Example data</h3> | |||
"option" : "basic" | |||
}, | |||
"dbt" : { | |||
"gitRepoBranch" : "gitRepoBranch", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? People do use our generated API docs. We should try to keep them generating?
airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java
Outdated
Show resolved
Hide resolved
public class DbtTransformationRunner implements AutoCloseable { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class); | ||
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd probably be safer to use absolute paths so the entrypoint isn't dependent on the working directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this constant is defining the filename so it drives how to name the script file.
But when it's invoked, it's called with the absolute path so there wouldn't be dependency with working directory.
…Workflow.java Co-authored-by: Jared Rhizor <jared@dataline.io>
* Requirements updated to CDK. airbyte-protocol and base-python requirements removed. * Bugfix: BufferedStreamConsumer. (#3387) * Format. * Bump versions. * main_dev.py renamed to main.py README.md updated * Source Stripe: Add Acceptance Tests to Stripe Connector (#3367) * Add Acceptance Tests to Stripe Connector * move configured_catalog.json to sample_files * bump version Co-authored-by: ykurochkin <y.kurochkin@zazmic.com> * Legacy lib references removed * FB Marketing source - lookback window logic not functioning correctly * FB Marketing source #1390 - returning buffered record while incremental sync * FB Marketing source #1390 - improving checking while syncing buffered record * FB Marketing source #1390 - adding loop_back to IncrementalStreamAPI * FB Marketing source #1390 - bump version * FB Marketing source #1390 - add CHANGELOG.md * Stop formatting python with spotless (#3388) * add test that migration output schema same as source schema (#3356) * Add updated architecture diagram to high level docs. (#3399) * Add updated architecture doc to high level docs. * Address review comments Co-authored-by: Abhi Vaidyanatha <abhivaidyanatha@Abhis-MacBook-Pro.local> * Correct GA readme error. (#3407) * make shopify more resilient to timeouts (#3409) * Update migration schema to include recent changes to the StandardSync object. (#3414) * Update all of Pydantic to 1.6.2 per Dependabot. (#3408) * Update all to 1.6.2. * Publish new airbyte-cdk version. * Use repr instead of str for exceptions. * Use rc. * Edit test. * Bump for SAT. * Format. * Docker ignore update. Fix setup.py * fixing ONLY problematic fields in freshdesk JSON schemas (#3376) * bump airbyte-webapp version (#2266) * add configuration for bumping webapp versionn * set to current version * Bump version: 0.16.0-alpha → 0.16.1-alpha * Revert "Bump version: 0.16.0-alpha → 0.16.1-alpha" Thiss reverts commit fdbf6dc. * also update package lock so we don't run into files changed errors * use 0.19.0-alpha * add npm webapp version * Add a CDK speedrun tutorial doc (#3403) * Add CDK Speedrun document. * Finish speedrun doc. * Address review comments * Add to SUMMARY.md Co-authored-by: Abhi Vaidyanatha <abhivaidyanatha@Abhis-MacBook-Pro.local> * Add Rust as a connector specific dependency to source-file (#3426) * Add Rust as a connector specific dependency to source-file * Add more details about installation. * Markdown lines are weird. Co-authored-by: Abhi Vaidyanatha <abhivaidyanatha@Abhis-MacBook-Pro.local> * API update to latest airbyte-cdk version * Add section Deploy Local on Windows (#3425) * add deploy on windows steps * correct minor * change suggestions by @avaidyanatha * GitBook: [master] 161 pages and 75 assets modified * Display icons (#3140) * Display icons * Improve icons views * MS SQL Server Destination implementation Fixes issue #613. Normalization is not yet enabled. This will have to be added at a later point. * Workflow to handle operations (custom transformation) (#3379) * Keep normalization backward compatible with old settings from destination * Bumpversion normalization image * add npm install before all npm run generates' (#3442) * restart containers if they fail automatically (#3423) * Update link for contribution scheduling (#3443) * Address issue with icon in onboarding (#3437) * rename toy connector tutorial to "Build a connector the hard way" (#3421) * Upload test reports (from integration test slash commands) as GitHub artifacts (#3416) * Archive test reports in github workflow * Archive Test reports only when failures * Fixing SqlServerOperations.java (#3454) Fixing some issues with `SqlServerOperations`, which was out of sync with recent changes to `SqlOperations`. * Add redirect to cdk tutorial page (#3456) * add redirect to cdk tutorial page * change path to cdk README.md Co-authored-by: Davin Chia <davinchia@gmail.com> Co-authored-by: Yevhenii <34103125+yevhenii-ldv@users.noreply.github.com> Co-authored-by: ykurochkin <y.kurochkin@zazmic.com> Co-authored-by: vitaliizazmic <75620293+vitaliizazmic@users.noreply.github.com> Co-authored-by: Charles <giardina.charles@gmail.com> Co-authored-by: Abhi Vaidyanatha <abhi@airbyte.io> Co-authored-by: Abhi Vaidyanatha <abhivaidyanatha@Abhis-MacBook-Pro.local> Co-authored-by: Jared Rhizor <jared@dataline.io> Co-authored-by: vovavovavovavova <39351371+vovavovavovavova@users.noreply.github.com> Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com> Co-authored-by: Marcos Marx <marcos@airbyte.io> Co-authored-by: Artem Astapenko <3767150+Jamakase@users.noreply.github.com> Co-authored-by: masonwheeler <masonwheeler@yahoo.com> Co-authored-by: Christophe Duong <christophe.duong@gmail.com> Co-authored-by: Sherif A. Nada <snadalive@gmail.com> Co-authored-by: Michel Tricot <michel@dataline.io>
if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { | ||
for (StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) { | ||
if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) { | ||
final NormalizationInput normalizationInput = new NormalizationInput() | ||
.withDestinationConfiguration(syncInput.getDestinationConfiguration()) | ||
.withCatalog(run.getOutputCatalog()); | ||
|
||
normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); | ||
} else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) { | ||
final OperatorDbtInput operatorDbtInput = new OperatorDbtInput() | ||
.withDestinationConfiguration(syncInput.getDestinationConfiguration()) | ||
.withOperatorDbt(standardSyncOperation.getOperatorDbt()); | ||
|
||
dbtTransformationActivity.run(jobRunConfig, destinationLauncherConfig, operatorDbtInput); | ||
} else { | ||
final String message = String.format("Unsupported operation type: %s", standardSyncOperation.getOperatorType()); | ||
LOGGER.error(message); | ||
throw new IllegalArgumentException(message); | ||
} | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is triggering corresponding Temporal Activity depending on operations in the settings of the connection/sync
What
Closes #3236
How
Re-use normalization docker image to generate the dbt settings to connect to destination
then run dbt with custom project files from a git url
Pre-merge Checklist
Recommended reading order
Changes to Normalization image & Runner:
airbyte-integrations/bases/base-normalization/entrypoint.sh
airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java
New changes for Custom Operations:
airbyte-workers/src/main/resources/dbt_transformation_entrypoint.sh
airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java
Change to workflow:
airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java