-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-managed-elements] Getting started guide (#10224)
* [dagster-managed-elements] Getting started guide * Start converting to airbyte * apidoc * link out to apidoc * First pass * Title + add Experimental callout Using the Experimental badge in headings causes link anchor issues. Moving it to a callout instead. * Prereqs * Adjust headings and dividers * Add Related section * Copy edits for grammar, spelling, clarity * Add to Integrations in sidenav * Fix links in Related * Reformat Related + add link to new Airbyte guide * add link in apidoc * docs rebuild * fix guide Co-authored-by: Erin Cochran <erin.k.cochran@gmail.com>
- Loading branch information
Showing
9 changed files
with
345 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
215 changes: 215 additions & 0 deletions
215
docs/content/guides/dagster/airbyte-ingestion-as-code.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,215 @@ | ||
--- | ||
title: Airbyte ingestion as code | Dagster Docs | ||
--- | ||
|
||
# Airbyte ingestion as code | ||
|
||
<Note> | ||
This feature is considered <strong>experimental</strong>. | ||
</Note> | ||
|
||
This guide provides an introduction to using Dagster to configure your [Airbyte](/integrations/airbyte) connections. This allows you to centralize the configuration for your data stack, specifying configuration in Python code. You can check-in and version your config with version control or programmatically generate config for more complex use cases. | ||
|
||
--- | ||
|
||
## Prerequisites | ||
|
||
To use this feature, you'll need to install the `dagster-airbyte` and `dagster-managed-elements` libraries: | ||
|
||
```bash | ||
pip install dagster-airbyte dagster-managed-elements | ||
``` | ||
|
||
Available as `dagster-me`, the `dagster-managed-elements` library includes the base config reconciliation APIs and a CLI. | ||
|
||
--- | ||
|
||
## Step 1: Define a reconciler | ||
|
||
The config for your Airbyte instance is specified in an `AirbyteManagedElementReconciler`, which is pointed at a specific Airbyte instance using an Airbyte resource. The config is also provided with a list of connections to reconcile, which we'll set up later in the guide. | ||
|
||
```python startafter=start_define_reconciler endbefore=end_define_reconciler file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 | ||
from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource | ||
|
||
airbyte_instance = airbyte_resource.configured( | ||
{ | ||
"host": "localhost", | ||
"port": "8000", | ||
# If using basic auth, include username and password: | ||
"username": "airbyte", | ||
"password": {"env": "AIRBYTE_PASSWORD"}, | ||
} | ||
) | ||
|
||
airbyte_reconciler = AirbyteManagedElementReconciler( | ||
airbyte=airbyte_instance, | ||
connections=[], | ||
) | ||
``` | ||
|
||
For more info on setting up an Airbyte resource, refer to [the Airbyte guide](/integrations/airbyte#step-1-connecting-to-airbyte). Additional configuration options for the reconciler are [detailed below](#additional-configuration-options). | ||
|
||
For more information on setting up secrets from the environment, refer to the [Environment variables and secrets guide](/guides/dagster/using-environment-variables-and-secrets). | ||
|
||
--- | ||
|
||
## Step 2: Define sources and destinations | ||
|
||
Next, we'll define our sources and destinations. Sources and destinations can be constructed manually using the `AirbyteSource` and `AirbyteDestination` classes, respectively, but `dagster-airbyte` also provides generated, typed classes for specific source and destination types. Refer to the [Airbyte API docs](/\_apidocs/libraries/dagster-airbyte#managed-config-generated-sources) for the properties required to configure each source and destination type. | ||
|
||
In this example, we'll configure a source that reads from a hosted CSV file and a destination that writes it to a local JSON file. To do this, we'll import the generated classes for the `File` source and `Local JSON` destination: | ||
|
||
```python startafter=start_define_sources endbefore=end_define_sources file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 | ||
from dagster_airbyte.managed.generated.sources import FileSource | ||
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination | ||
|
||
cereals_csv_source = FileSource( | ||
name="cereals-csv", | ||
url="https://docs.dagster.io/assets/cereal.csv", | ||
format="csv", | ||
provider=FileSource.HTTPSPublicWeb(), | ||
dataset_name="cereals", | ||
) | ||
|
||
local_json_destination = LocalJsonDestination( | ||
name="local-json", | ||
destination_path="/local/cereals_out.json", | ||
) | ||
``` | ||
|
||
--- | ||
|
||
## Step 3: Define a connection | ||
|
||
Next, we'll define a connection between the source and destination using the [`AirbyteConnection`](/\_apidocs/libraries/dagster-airbyte#dagster_airbyte.AirbyteConnection) class: | ||
|
||
```python startafter=start_define_connection endbefore=end_define_connection file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 | ||
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode | ||
|
||
cereals_connection = AirbyteConnection( | ||
name="download-cereals", | ||
source=cereals_csv_source, | ||
destination=local_json_destination, | ||
stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()}, | ||
) | ||
``` | ||
|
||
Then, we'll supply the new connection to the reconciler we defined in [Step 1](#step-1-define-a-reconciler): | ||
|
||
```python startafter=start_new_reconciler endbefore=end_new_reconciler file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 | ||
airbyte_reconciler = AirbyteManagedElementReconciler( | ||
airbyte=airbyte_instance, | ||
connections=[cereals_connection], | ||
) | ||
``` | ||
|
||
--- | ||
|
||
## Step 4. Validate changes | ||
|
||
Next, we'll use the `dagster-me` CLI to sanity-check our reconciler and apply any changes. | ||
|
||
The `check` command prints out differences between the current state of the Airbyte instance and the desired state specified in the reconciler. To invoke the CLI, point it at a module containing the reconciler: | ||
|
||
```bash | ||
dagster-me check --module my_python_module.my_submodule:reconciler | ||
|
||
Found 1 reconciler, checking... | ||
+ cereals-csv: | ||
+ url: https://docs.dagster.io/assets/cereal.csv | ||
+ format: csv | ||
+ dataset_name: cereals | ||
+ provider: | ||
+ user_agent: False | ||
+ storage: HTTPS | ||
+ local-json: | ||
+ destination_path: /local/cereals_out.json | ||
+ download-cereals: | ||
+ source: cereals-csv | ||
+ destination: local-json | ||
+ normalize data: None | ||
+ streams: | ||
+ cereals: FULL_REFRESH_OVERWRITE | ||
``` | ||
|
||
--- | ||
|
||
## Step 5. Apply changes | ||
|
||
As the changes printed out by the `check` command in the previous step look like what we expect, we can now apply them: | ||
|
||
```bash | ||
dagster-me apply --module my_python_module.my_submodule:reconciler | ||
``` | ||
|
||
Now, we should see our new connection in the Airbyte UI: | ||
|
||
<Image | ||
alt="instance-overview" | ||
src="/images/guides/airbyte-ingestion-as-code/connection-in-ui.png" | ||
width={755} | ||
height={486} | ||
/> | ||
|
||
--- | ||
|
||
## Step 6. Load connections into Dagster | ||
|
||
To load managed connections into Dagster, use the `load_assets_from_connections` utility method. This functions similarly to [`load_assets_from_airbyte_instance`](/integrations/airbyte#loading-airbyte-asset-definitions-from-an-airbyte-instance), but validates that the connections passed in match the connections defined in your Airbyte instance: | ||
|
||
```python startafter=start_load_assets endbefore=end_load_assets file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 | ||
from dagster_airbyte import load_assets_from_connections, airbyte_resource | ||
|
||
airbyte_instance = airbyte_resource.configured( | ||
{ | ||
"host": "localhost", | ||
"port": 8000, | ||
# If using basic auth, include username and password: | ||
"username": "airbyte", | ||
"password": {"env": "AIRBYTE_PASSWORD"}, | ||
} | ||
) | ||
|
||
airbyte_assets = load_assets_from_connections( | ||
airbyte=airbyte_instance, connections=[cereals_connection] | ||
) | ||
``` | ||
|
||
For more info on managing Airbyte assets in Dagster, refer to the [Airbyte guide](/integrations/airbyte). | ||
|
||
--- | ||
|
||
## Additional configuration options | ||
|
||
The Airbyte reconciler also supports some additional configuration options, which can be passed to the `AirbyteManagedElementReconciler` constructor. | ||
|
||
By default, the reconciler will not modify any sources, destinations, or connections which are outside of those specified in the reconciler. This allows you to adopt the reconciler incrementally, without having to reconcile all of your existing Airbyte configuration. | ||
|
||
To reconcile all of your existing Airbyte configuration, pass `delete_unmentioned_resources=True` to the reconciler constructor: | ||
|
||
```python startafter=start_new_reconciler_delete endbefore=end_new_reconciler_delete file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 | ||
airbyte_reconciler = AirbyteManagedElementReconciler( | ||
airbyte=airbyte_instance, connections=[...], delete_unmentioned_resources=True | ||
) | ||
``` | ||
|
||
This tells the reconciler to clean up any sources, destinations, or connections which are not explicitly defined in Python code. | ||
|
||
--- | ||
|
||
## Related | ||
|
||
<ArticleList> | ||
<ArticleListItem | ||
href="/integrations/airbyte" | ||
title="Airbyte + Dagster guide" | ||
></ArticleListItem> | ||
<ArticleListItem | ||
href="/\_apidocs/libraries/dagster-airbyte#dagster_airbyte" | ||
title="dagster-airbyte API reference" | ||
></ArticleListItem> | ||
<ArticleListItem | ||
href="/guides/dagster/using-environment-variables-and-secrets" | ||
title="Environment variables and secrets" | ||
></ArticleListItem> | ||
</ArticleList> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file added
BIN
+202 KB
docs/next/public/images/guides/airbyte-ingestion-as-code/connection-in-ui.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
83 changes: 83 additions & 0 deletions
83
examples/docs_snippets/docs_snippets/guides/dagster/ingestion_as_code/airbyte.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# isort: skip_file | ||
# pylint: disable=unused-variable | ||
|
||
|
||
def scope_define_reconciler(): | ||
# start_define_reconciler | ||
from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource | ||
|
||
airbyte_instance = airbyte_resource.configured( | ||
{ | ||
"host": "localhost", | ||
"port": "8000", | ||
# If using basic auth, include username and password: | ||
"username": "airbyte", | ||
"password": {"env": "AIRBYTE_PASSWORD"}, | ||
} | ||
) | ||
|
||
airbyte_reconciler = AirbyteManagedElementReconciler( | ||
airbyte=airbyte_instance, | ||
connections=[], | ||
) | ||
# end_define_reconciler | ||
|
||
# start_define_sources | ||
from dagster_airbyte.managed.generated.sources import FileSource | ||
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination | ||
|
||
cereals_csv_source = FileSource( | ||
name="cereals-csv", | ||
url="https://docs.dagster.io/assets/cereal.csv", | ||
format="csv", | ||
provider=FileSource.HTTPSPublicWeb(), | ||
dataset_name="cereals", | ||
) | ||
|
||
local_json_destination = LocalJsonDestination( | ||
name="local-json", | ||
destination_path="/local/cereals_out.json", | ||
) | ||
# end_define_sources | ||
|
||
# start_define_connection | ||
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode | ||
|
||
cereals_connection = AirbyteConnection( | ||
name="download-cereals", | ||
source=cereals_csv_source, | ||
destination=local_json_destination, | ||
stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()}, | ||
) | ||
# end_define_connection | ||
|
||
# start_new_reconciler | ||
airbyte_reconciler = AirbyteManagedElementReconciler( | ||
airbyte=airbyte_instance, | ||
connections=[cereals_connection], | ||
) | ||
# end_new_reconciler | ||
|
||
# start_new_reconciler_delete | ||
airbyte_reconciler = AirbyteManagedElementReconciler( | ||
airbyte=airbyte_instance, connections=[...], delete_unmentioned_resources=True | ||
) | ||
# end_new_reconciler_delete | ||
|
||
# start_load_assets | ||
from dagster_airbyte import load_assets_from_connections, airbyte_resource | ||
|
||
airbyte_instance = airbyte_resource.configured( | ||
{ | ||
"host": "localhost", | ||
"port": 8000, | ||
# If using basic auth, include username and password: | ||
"username": "airbyte", | ||
"password": {"env": "AIRBYTE_PASSWORD"}, | ||
} | ||
) | ||
|
||
airbyte_assets = load_assets_from_connections( | ||
airbyte=airbyte_instance, connections=[cereals_connection] | ||
) | ||
# end_load_assets |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters