Skip to content

Commit

Permalink
[examples] For modern_data_stack_assets, more detailed README, helper…
Browse files Browse the repository at this point in the history
… script (#6914)
  • Loading branch information
OwenKephart committed Mar 3, 2022
1 parent 498b261 commit 7f35164
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 10,232 deletions.
65 changes: 65 additions & 0 deletions examples/modern_data_stack_assets/README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,67 @@
# Intro

This is an example of how to use the Software-Defined Asset APIs alongside Modern Data Stack tools
(specifically, Airbyte and dbt).

# Setup

## Python

To install this example and its python dependencies, run:

```
$ pip install -e .
```

Once you've done this, you can run:

```
$ dagit -m modern_data_stack_assets
```

To view this example in Dagster's UI, Dagit.

If you try to kick off a run immediately, it will fail, as there is no source data to ingest/transform, nor is there an active Airbyte connection. To get everything set up properly, read on.

## Local Postgres

To keep things running on a single machine, we'll use a local postgres instance as both the source and the destination for our data. You can imagine the "source" database as some online transactional database, and the "destination" as a data warehouse (something like Snowflake).

To get a postgres instance with the required source and destination databases running on your machine, you can run:

```
$ docker pull postgres
$ docker run --name mds-demo -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres
$ PGPASSWORD=password psql -h localhost -p 5432 -U postgres -d postgres -c "CREATE DATABASE postgres_replica;"
```

## Airbyte

Now, you'll want to get Airbyte running locally. The full instructions can be found [here](https://docs.airbyte.com/deploying-airbyte/local-deployment), but if you just want to run some commands (in a separate terminal):

```
$ git clone https://github.com/airbytehq/airbyte.git
$ cd airbyte
$ docker-compose up
```

Once you've done this, you should be able to go to http://localhost:8000, and see Airbyte's UI.

## Data and Connections

Now, you'll want to seed some data into the empty database you just created, and create an Airbyte connection between the source and destination databases.

There's a script provided that should handle this all for you, which you can run with:

```
$ python -m modern_data_stack_assets.setup_airbyte
```

At the end of this output, you should see something like:

```
Created Airbyte Connection: c90cb8a5-c516-4c1a-b243-33dfe2cfb9e8
```

This connection id is specific to your local setup, so you'll need to update `constants.py` with this
value. Once you've update your `constants.py` file, you're good to go!
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ def predicted_orders(
resource_defs={
"airbyte": airbyte_resource.configured(AIRBYTE_CONFIG),
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"pandas_io_manager": pandas_io_manager.configured(PG_CONFIG),
"pandas_io_manager": pandas_io_manager.configured(PANDAS_IO_CONFIG),
},
).build_job("Assets")
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,43 @@

from dagster.utils import file_relative_path

# =========================================================================
# To get this value, run `python -m modern_data_stack_assets.setup_airbyte`
# and grab the connection id that it prints
AIRBYTE_CONNECTION_ID = "your_airbyte_connection_id"
# =========================================================================


def model_func(x, a, b):
return a * np.exp(b * (x / 10**18 - 1.6095))
return a * np.exp(b * (x / 10 ** 18 - 1.6095))


PG_SOURCE_CONFIG = {
"username": "postgres",
"password": "password",
"host": "localhost",
"port": 5432,
"database": "postgres",
}
PG_DESTINATION_CONFIG = {
"username": "postgres",
"password": "password",
"host": "localhost",
"port": 5432,
"database": "postgres_replica",
}


AIRBYTE_CONNECTION_ID = "your_airbyte_connection_id"
AIRBYTE_CONFIG = {"host": "localhost", "port": "8000"}
DBT_PROJECT_DIR = file_relative_path(__file__, "../mds_dbt")
DBT_PROFILES_DIR = file_relative_path(__file__, "../mds_dbt/config")
DBT_CONFIG = {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}
PG_CONFIG = {
PANDAS_IO_CONFIG = {
"con_string": get_conn_string(
username="postgres", password="password", hostname="localhost", db_name="postgres_replica"
username=PG_DESTINATION_CONFIG["username"],
password=PG_DESTINATION_CONFIG["password"],
hostname=PG_DESTINATION_CONFIG["host"],
port=str(PG_DESTINATION_CONFIG["port"]),
db_name=PG_DESTINATION_CONFIG["database"],
)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""
A basic script that will create tables in the source postgres database, then automatically
create an Airbyte Connection between the source database and destination database.
"""
import random

import numpy as np
import pandas as pd
from dagster import check
from dagster_postgres.utils import get_conn_string
from dagster_airbyte import AirbyteResource

from .constants import PG_SOURCE_CONFIG, PG_DESTINATION_CONFIG


# configures the number of records for each table
N_USERS = 100
N_ORDERS = 10000


def _create_ab_source(client: AirbyteResource) -> str:
workspace_id = client.make_request("/workspaces/list", data={})["workspaces"][0]["workspaceId"]

# get latest available Postgres source definition
source_defs = client.make_request(
"/source_definitions/list_latest", data={"workspaceId": workspace_id}
)
postgres_definitions = [
sd for sd in source_defs["sourceDefinitions"] if sd["name"] == "Postgres"
]
if not postgres_definitions:
raise check.CheckError("Expected at least one Postgres source definition.")
source_definition_id = postgres_definitions[0]["sourceDefinitionId"]

# create Postgres source
source_id = client.make_request(
"/sources/create",
data={
"sourceDefinitionId": source_definition_id,
"connectionConfiguration": dict(**PG_SOURCE_CONFIG, ssl=False),
"workspaceId": workspace_id,
"name": "Source Database",
},
)["sourceId"]
print(f"Created Airbyte Source: {source_id}")
return source_id


def _create_ab_destination(client: AirbyteResource) -> str:
workspace_id = client.make_request("/workspaces/list", data={})["workspaces"][0]["workspaceId"]

# get the latest available Postgres destination definition
destination_defs = client.make_request(
"/destination_definitions/list_latest", data={"workspaceId": workspace_id}
)
postgres_definitions = [
dd for dd in destination_defs["destinationDefinitions"] if dd["name"] == "Postgres"
]
if not postgres_definitions:
raise check.CheckError("Expected at least one Postgres destination definition.")
destination_definition_id = postgres_definitions[0]["destinationDefinitionId"]

# create Postgres destination
destination_id = client.make_request(
"/destinations/create",
data={
"destinationDefinitionId": destination_definition_id,
"connectionConfiguration": dict(**PG_DESTINATION_CONFIG, schema="public", ssl=False),
"workspaceId": workspace_id,
"name": "Destination Database",
},
)["destinationId"]
print(f"Created Airbyte Destination: {destination_id}")
return destination_id


def setup_airbyte():
client = AirbyteResource(host="localhost", port="8000", use_https=False)
source_id = _create_ab_source(client)
destination_id = _create_ab_destination(client)

source_catalog = client.make_request("/sources/discover_schema", data={"sourceId": source_id})[
"catalog"
]

# create a connection between the new source and destination
connection_id = client.make_request(
"/connections/create",
data={
"name": "Example Connection",
"sourceId": source_id,
"destinationId": destination_id,
"syncCatalog": source_catalog,
"status": "active",
},
)["connectionId"]

print(f"Created Airbyte Connection: {connection_id}")


def _random_dates():

start = pd.to_datetime("2021-01-01")
end = pd.to_datetime("2022-01-01")

start_u = start.value // 10 ** 9
end_u = end.value // 10 ** 9

dist = np.random.standard_exponential(size=N_ORDERS) / 10

clipped_flipped_dist = 1 - dist[dist <= 1]
clipped_flipped_dist = clipped_flipped_dist[:-1]

if len(clipped_flipped_dist) < N_ORDERS:
clipped_flipped_dist = np.append(
clipped_flipped_dist, clipped_flipped_dist[: N_ORDERS - len(clipped_flipped_dist)]
)

return pd.to_datetime((clipped_flipped_dist * (end_u - start_u)) + start_u, unit="s")


def add_data():
con_string = get_conn_string(
username=PG_SOURCE_CONFIG["username"],
password=PG_SOURCE_CONFIG["password"],
hostname=PG_SOURCE_CONFIG["host"],
port=str(PG_SOURCE_CONFIG["port"]),
db_name=PG_SOURCE_CONFIG["database"],
)

users = pd.DataFrame(
{
"user_id": range(N_USERS),
"is_bot": [random.choice([True, False]) for _ in range(N_USERS)],
}
)

users.to_sql("users", con=con_string, if_exists="replace")
print("Created users table.")

orders = pd.DataFrame(
{
"user_id": [random.randint(0, N_USERS) for _ in range(N_ORDERS)],
"order_time": _random_dates(),
"order_value": np.random.normal(loc=100.0, scale=15.0, size=N_ORDERS),
}
)

orders.to_sql("orders", con=con_string, if_exists="replace")
print("Created orders table.")


add_data()
setup_airbyte()

0 comments on commit 7f35164

Please sign in to comment.