Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-managed-elements] Getting started guide #10224

Merged
merged 16 commits into from
Dec 6, 2022
Merged
16 changes: 15 additions & 1 deletion docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,17 @@
"children": [
{
"title": "Airbyte",
"path": "/integrations/airbyte"
"path": "/integrations/airbyte",
"children": [
{
"title": "Airbyte + Dagster",
"path": "/integrations/airbyte"
},
{
"title": "Ingestion as code",
"path": "/guides/dagster/airbyte-ingestion-as-code"
}
]
},
{
"title": "Airflow",
Expand Down Expand Up @@ -705,6 +715,10 @@
{
"title": "Using custom run coordinators",
"path": "/guides/dagster/run-attribution"
},
{
"title": "Airbyte ingestion as code",
"path": "/guides/dagster/airbyte-ingestion-as-code"
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

215 changes: 215 additions & 0 deletions docs/content/guides/dagster/airbyte-ingestion-as-code.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
---
title: Airbyte ingestion as code | Dagster Docs
---

# Airbyte ingestion as code

<Note>
This feature is considered <strong>experimental</strong>.
</Note>

This guide provides an introduction to using Dagster to configure your [Airbyte](/integrations/airbyte) connections. This allows you to centralize the configuration for your data stack, specifying configuration in Python code. You can check-in and version your config with version control or programmatically generate config for more complex use cases.

---

## Prerequisites

To use this feature, you'll need to install the `dagster-airbyte` and `dagster-managed-elements` libraries:

```bash
pip install dagster-airbyte dagster-managed-elements
```

Available as `dagster-me`, the `dagster-managed-elements` library includes the base config reconciliation APIs and a CLI.

---

## Step 1: Define a reconciler

The config for your Airbyte instance is specified in an `AirbyteManagedElementReconciler`, which is pointed at a specific Airbyte instance using an Airbyte resource. The config is also provided with a list of connections to reconcile, which we'll set up later in the guide.

```python startafter=start_define_reconciler endbefore=end_define_reconciler file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4
from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource

airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000",
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)

airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[],
)
```

For more info on setting up an Airbyte resource, refer to [the Airbyte guide](/integrations/airbyte#step-1-connecting-to-airbyte). Additional configuration options for the reconciler are [detailed below](#additional-configuration-options).

For more information on setting up secrets from the environment, refer to the [Environment variables and secrets guide](/guides/dagster/using-environment-variables-and-secrets).

---

## Step 2: Define sources and destinations

Next, we'll define our sources and destinations. Sources and destinations can be constructed manually using the `AirbyteSource` and `AirbyteDestination` classes, respectively, but `dagster-airbyte` also provides generated, typed classes for specific source and destination types. Refer to the [Airbyte API docs](/\_apidocs/libraries/dagster-airbyte#managed-config-generated-sources) for the properties required to configure each source and destination type.

In this example, we'll configure a source that reads from a hosted CSV file and a destination that writes it to a local JSON file. To do this, we'll import the generated classes for the `File` source and `Local JSON` destination:

```python startafter=start_define_sources endbefore=end_define_sources file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4
from dagster_airbyte.managed.generated.sources import FileSource
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination

cereals_csv_source = FileSource(
name="cereals-csv",
url="https://docs.dagster.io/assets/cereal.csv",
format="csv",
provider=FileSource.HTTPSPublicWeb(),
dataset_name="cereals",
)

local_json_destination = LocalJsonDestination(
name="local-json",
destination_path="/local/cereals_out.json",
)
```

---

## Step 3: Define a connection

Next, we'll define a connection between the source and destination using the [`AirbyteConnection`](/\_apidocs/libraries/dagster-airbyte#dagster_airbyte.AirbyteConnection) class:

```python startafter=start_define_connection endbefore=end_define_connection file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode

cereals_connection = AirbyteConnection(
name="download-cereals",
source=cereals_csv_source,
destination=local_json_destination,
stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()},
)
```

Then, we'll supply the new connection to the reconciler we defined in [Step 1](#step-1-define-a-reconciler):

```python startafter=start_new_reconciler endbefore=end_new_reconciler file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[cereals_connection],
)
```

---

## Step 4. Validate changes

Next, we'll use the `dagster-me` CLI to sanity-check our reconciler and apply any changes.

The `check` command prints out differences between the current state of the Airbyte instance and the desired state specified in the reconciler. To invoke the CLI, point it at a module containing the reconciler:

```bash
dagster-me check --module my_python_module.my_submodule:reconciler

Found 1 reconciler, checking...
+ cereals-csv:
+ url: https://docs.dagster.io/assets/cereal.csv
+ format: csv
+ dataset_name: cereals
+ provider:
+ user_agent: False
+ storage: HTTPS
+ local-json:
+ destination_path: /local/cereals_out.json
+ download-cereals:
+ source: cereals-csv
+ destination: local-json
+ normalize data: None
+ streams:
+ cereals: FULL_REFRESH_OVERWRITE
```

---

## Step 5. Apply changes

As the changes printed out by the `check` command in the previous step look like what we expect, we can now apply them:

```bash
dagster-me apply --module my_python_module.my_submodule:reconciler
```

Now, we should see our new connection in the Airbyte UI:

<Image
alt="instance-overview"
src="/images/guides/airbyte-ingestion-as-code/connection-in-ui.png"
width={755}
height={486}
/>

---

## Step 6. Load connections into Dagster

To load managed connections into Dagster, use the `load_assets_from_connections` utility method. This functions similarly to [`load_assets_from_airbyte_instance`](/integrations/airbyte#loading-airbyte-asset-definitions-from-an-airbyte-instance), but validates that the connections passed in match the connections defined in your Airbyte instance:

```python startafter=start_load_assets endbefore=end_load_assets file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4
from dagster_airbyte import load_assets_from_connections, airbyte_resource

airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": 8000,
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)

airbyte_assets = load_assets_from_connections(
airbyte=airbyte_instance, connections=[cereals_connection]
)
```

For more info on managing Airbyte assets in Dagster, refer to the [Airbyte guide](/integrations/airbyte).

---

## Additional configuration options

The Airbyte reconciler also supports some additional configuration options, which can be passed to the `AirbyteManagedElementReconciler` constructor.

By default, the reconciler will not modify any sources, destinations, or connections which are outside of those specified in the reconciler. This allows you to adopt the reconciler incrementally, without having to reconcile all of your existing Airbyte configuration.

To reconcile all of your existing Airbyte configuration, pass `delete_unmentioned_resources=True` to the reconciler constructor:

```python startafter=start_new_reconciler_delete endbefore=end_new_reconciler_delete file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance, connections=[...], delete_unmentioned_resources=True
)
```

This tells the reconciler to clean up any sources, destinations, or connections which are not explicitly defined in Python code.

---

## Related

<ArticleList>
<ArticleListItem
href="/integrations/airbyte"
title="Airbyte + Dagster guide"
></ArticleListItem>
<ArticleListItem
href="/\_apidocs/libraries/dagster-airbyte#dagster_airbyte"
title="dagster-airbyte API reference"
></ArticleListItem>
<ArticleListItem
href="/guides/dagster/using-environment-variables-and-secrets"
title="Environment variables and secrets"
></ArticleListItem>
</ArticleList>
32 changes: 26 additions & 6 deletions docs/content/integrations/airbyte.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ from dagster_airbyte import airbyte_resource
airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": 8000,
"port": "8000",
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
Expand Down Expand Up @@ -274,9 +274,29 @@ If you have questions on using Airbyte with Dagster, we'd love to hear from you:
</a>
</p>

### Related material
---

- [`dagster-airbyte` API reference](/\_apidocs/libraries/dagster-airbyte)
- [Software-defined assets](/concepts/assets/software-defined-assets)
- [Resources](/concepts/resources)
- [Scheduling Dagster jobs](/concepts/partitions-schedules-sensors/schedules#running-the-scheduler)
## Related

<ArticleList>
<ArticleListItem
href="/\_apidocs/libraries/dagster-airbyte"
title="dagster-airbyte API reference"
></ArticleListItem>
<ArticleListItem
href="/guides/dagster/airbyte-ingestion-as-code"
title="Airbyte ingestion as code"
></ArticleListItem>
<ArticleListItem
href="/concepts/assets/software-defined-assets"
title="Software-defined assets"
></ArticleListItem>
<ArticleListItem
href="/concepts/resources"
title="Resources"
></ArticleListItem>
<ArticleListItem
href="/concepts/partitions-schedules-sensors/schedules#running-the-scheduler"
title="Scheduling Dagster jobs"
></ArticleListItem>
</ArticleList>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Assets
Managed Config
==============

The following APIs are used as part of the experimental ingestion-as-code functionality.
For more information, see the `Airbyte ingestion as code guide </guides/dagster/airbyte-ingestion-as-code>`_.

.. autoclass:: AirbyteManagedElementReconciler
:members:
:special-members: __init__
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# isort: skip_file
# pylint: disable=unused-variable


def scope_define_reconciler():
# start_define_reconciler
from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource

airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000",
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)

airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[],
)
# end_define_reconciler

# start_define_sources
from dagster_airbyte.managed.generated.sources import FileSource
from dagster_airbyte.managed.generated.destinations import LocalJsonDestination

cereals_csv_source = FileSource(
name="cereals-csv",
url="https://docs.dagster.io/assets/cereal.csv",
format="csv",
provider=FileSource.HTTPSPublicWeb(),
dataset_name="cereals",
)

local_json_destination = LocalJsonDestination(
name="local-json",
destination_path="/local/cereals_out.json",
)
# end_define_sources

# start_define_connection
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode

cereals_connection = AirbyteConnection(
name="download-cereals",
source=cereals_csv_source,
destination=local_json_destination,
stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()},
)
# end_define_connection

# start_new_reconciler
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[cereals_connection],
)
# end_new_reconciler

# start_new_reconciler_delete
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance, connections=[...], delete_unmentioned_resources=True
)
# end_new_reconciler_delete

# start_load_assets
from dagster_airbyte import load_assets_from_connections, airbyte_resource

airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": 8000,
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)

airbyte_assets = load_assets_from_connections(
airbyte=airbyte_instance, connections=[cereals_connection]
)
# end_load_assets
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def scope_define_instance():
airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": 8000,
"port": "8000",
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
Expand Down