Skip to content

Commit

Permalink
[dagster-managed-elements] Getting started guide
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Oct 27, 2022
1 parent 886da61 commit 1cac966
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,10 @@
{
"title": "Using custom run coordinators",
"path": "/guides/dagster/run-attribution"
},
{
"title": "Managed elements",
"path": "/guides/dagster/managed-elements"
}
]
}
Expand Down
162 changes: 162 additions & 0 deletions docs/content/guides/dagster/managed-elements.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
---
title: Managed elements | Dagster
---

# Mananged elements <Experimental />

This guide provides an intro to Dagster **managed elements**. Managed elements allow you to centralize the configuration for your data stack, specifying configuration in Python code. You can check-in and version your config with version control or programatically generate config for more complex use-cases.

Currently, the following integrations support managed elements:

- [`dagster-airbyte`](/integrations/airbyte)

---

## Getting started

In order to use managed elements, you must first install the `dagster-managed-elements` package. This includes the base APIs used by each implementation, as well as making the managed elements CLI available, available as `dagster-me`.

Every managed element is controlled by a `ManagedElementReconciler` implementation. Typically, all of the reconcilers in your project are colocated in a single Python module, which the `dagster-me` CLI will use to discover reconcilers.

### Using the `dagster-me` CLI

The `dagster-me` CLI allows you to check the synchronization status of your managed elements, as well as to apply any config changes.

### Checking the status of your managed elements

To load all managed element reconcilers in a module and check their status, you can run:

```bash
dagster-me check --module my_python_module.my_submodule
```

This will print out a diff of the current state and the desired state for each reconciler.

You may also specify a working directory to search for the module in:

```bash
dagster-me check --module my_python_module.my_submodule --working-directory ./my_project
```

Additionally, you can specify one or more Python object paths to select a specific reconciler:

```bash
dagster-me check --module my_python_module.my_submodule:my_reconciler
```

#### Applying changes

To apply changes to your managed elements, you can instead run the `apply` command:

```bash
dagster-me apply --module my_python_module.my_submodule
```

## Using managed elements to configure Airbyte

As an alternative to configuring Airbyte using its UI, you may create and update Airbyte sources, destinations, and connections using managed elements.

### Defining a reconciler

The first step is to create a reconciler, which is pointed at a specific Airbyte instance using an Airbyte resource. It is provided with a list of connections to reconcile, which we will set up next.

```python startafter=start_define_reconciler endbefore=end_define_reconciler file=/guides/dagster/managed_elements/airbyte.py dedent=4
from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource

airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000",
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)

airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[],
)
```

### Defining sources and destinations

Next, we will define our sources and destinations. These are defined using the `AirbyteSource` and `AirbyteDestination` classes, respectively.

```python startafter=start_define_sources endbefore=end_define_sources file=/guides/dagster/managed_elements/airbyte.py dedent=4
from dagster_airbyte import AirbyteSource, AirbyteDestination

cereals_csv_source = AirbyteSource(
name="cereals-csv",
source_type="File",
source_configuration={
"url": "https://docs.dagster.io/assets/cereal.csv",
"format": "csv",
"provider": {"storage": "HTTPS", "user_agent": False},
"dataset_name": "cereals",
},
)

local_json_destination = AirbyteDestination(
name="local-json",
destination_type="Local JSON",
destination_configuration={"destination_path": "/local/cereals_out.json"},
)
```

### Defining a connection

Finally, we will define a connection between our source and destination. This is defined using the `AirbyteConnection` class.

```python startafter=start_define_connection endbefore=end_define_connection file=/guides/dagster/managed_elements/airbyte.py dedent=4
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode

cereals_connection = AirbyteConnection(
name="download-cereals",
source=cereals_csv_source,
destination=local_json_destination,
stream_config={"cereals": AirbyteSyncMode.FULL_REFRESH_OVERWRITE},
)
```

We'll supply our new connection to the reconciler we defined earlier:

```python startafter=start_new_reconciler endbefore=end_new_reconciler file=/guides/dagster/managed_elements/airbyte.py dedent=4
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[cereals_connection],
)
```

### Applying changes

Next, we'll use the `dagster-me` CLI to sanity check our reconciler and apply any changes:

```bash
dagster-me check --module my_python_module.my_submodule:reconciler

Found 1 managed elements, checking...
+ cereals-csv:
+ url: https://docs.dagster.io/assets/cereal.csv
+ format: csv
+ dataset_name: cereals
+ provider:
+ user_agent: False
+ storage: HTTPS
+ local-json:
+ destination_path: /local/cereals_out.json
+ download-cereals:
+ source: cereals-csv
+ destination: local-json
+ normalize data: None
+ streams:
+ cereals: FULL_REFRESH_OVERWRITE
```

Since this looks good, we'll apply the changes:

```bash
dagster-me apply --module my_python_module.my_submodule:reconciler
```

Now, we should see our new connection in the Airbyte UI!
2 changes: 2 additions & 0 deletions docs/content/guides/experimental-features.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ title: Experimental Feature Guides | Dagster Docs
- [Using versioning and memoization](/guides/dagster/memoization) - Learn to use Dagster's versioning and memoization features in job re-execution

- [Using Custom Run Coordinators to perform run attribution](/guides/dagster/run-attribution) - A look at using a Custom Run Coordinator to perform run attribution

- [Using managed elements](/guides/dagster/managed-elements) - A look at using Dagster managed elements to configure your data stack in code
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# isort: skip_file
# pylint: disable=unused-variable


def scope_define_reconciler():
# start_define_reconciler
from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource

airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000",
# If using basic auth, include username and password:
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)

airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[],
)
# end_define_reconciler

# start_define_sources
from dagster_airbyte import AirbyteSource, AirbyteDestination

cereals_csv_source = AirbyteSource(
name="cereals-csv",
source_type="File",
source_configuration={
"url": "https://docs.dagster.io/assets/cereal.csv",
"format": "csv",
"provider": {"storage": "HTTPS", "user_agent": False},
"dataset_name": "cereals",
},
)

local_json_destination = AirbyteDestination(
name="local-json",
destination_type="Local JSON",
destination_configuration={"destination_path": "/local/cereals_out.json"},
)
# end_define_sources

# start_define_connection
from dagster_airbyte import AirbyteConnection, AirbyteSyncMode

cereals_connection = AirbyteConnection(
name="download-cereals",
source=cereals_csv_source,
destination=local_json_destination,
stream_config={"cereals": AirbyteSyncMode.FULL_REFRESH_OVERWRITE},
)
# end_define_connection

# start_new_reconciler
airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[cereals_connection],
)
# end_new_reconciler

0 comments on commit 1cac966

Please sign in to comment.