Skip to content

Commit

Permalink
[docs] [snowflake] Add partitions to snowflake guide (#12231)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Feb 16, 2023
1 parent c251806 commit 532ced5
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

166 changes: 166 additions & 0 deletions docs/content/integrations/snowflake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This reference page provides information for working with [`dagster-snowflake`](

- [Authenticating using a private key](#authenticating-using-a-private-key)
- [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset)
- [Storing partitioned assets](#storing-partitioned-assets)
- [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas)
- [Using the Snowflake I/O manager with other I/O managers](#using-the-snowflake-io-manager-with-other-io-managers)
- [Storing and loading PySpark DataFrames in Snowflake](#storing-and-loading-pyspark-dataframes-in-snowflake)
Expand Down Expand Up @@ -110,6 +111,171 @@ When Dagster materializes `sepal_data` and loads the `iris_dataset` asset using

---

## Storing partitioned assets

The Snowflake I/O manager supports storing and loading partitioned data. In order to correctly store and load data from the Snowflake table, the Snowflake I/O manager needs to know which column contains the data defining the partition bounds. The Snowflake I/O manager uses this information to construct the correct queries to select or replace the data. In the following sections, we describe how the I/O manager constructs these queries for different types of partitions.

### Storing static partitioned assets

In order to store static partitioned assets in Snowflake, you must specify `partition_expr` metadata on the asset to tell the Snowflake I/O manager which column contains the partition data:

```python file=/integrations/snowflake/static_partition.py startafter=start_example endbefore=end_example
import pandas as pd

from dagster import StaticPartitionDefinition, asset


@asset(
partitions_def=StaticPartitionDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
species = context.asset_partition_key_for_output()

full_df = pd.read_csv(
"https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
names=[
"Sepal length (cm)",
"Sepal width (cm)",
"Petal length (cm)",
"Petal width (cm)",
"Species",
],
)

return full_df[full_df["Species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
return iris_dataset_partitioned.dropna().drop_duplicates()
```

Dagster uses the `partition_expr` metadata to craft the `SELECT` statement when loading the partition in the downstream asset. When loading a static partition, the following statement is used:

```sql
SELECT *
WHERE [partition_expr] = [selected partition]
```

When the `partition_expr` value is injected into this statement, the resulting SQL query must follow Snowflake's SQL syntax. Refer to the [Snowflake documentation](https://docs.snowflake.com/en/sql-reference/constructs) for more information.

When materializing the above assets, a partition must be selected, as described in [Materializing partitioned assets](/concepts/partitions-schedules-sensors/partitions#materializing-partitioned-assets). In this example, the query used when materializing the `Iris-setosa` partition of the above assets would be:

```sql
SELECT *
WHERE SPECIES = 'Iris-setosa'
```

### Storing time partitioned assets

Like static partitioned assets, you can specify `partition_expr` metadata on the asset to tell the Snowflake I/O manager which column contains the partition data:

```python file=/integrations/snowflake/time_partition.py startafter=start_example endbefore=end_example
import pandas as pd

from dagster import DailyPartitionsDefinition, asset


@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
metadata={"partition_expr": "TO_TIMESTAMP(time::INT)"},
)
def iris_data_per_day(context) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
# the time of the row as an integer of seconds since epoch
return get_iris_data_for_date(partition)


@asset
def iris_cleaned(iris_data_per_day: pd.DataFrame):
return iris_data_per_day.dropna().drop_duplicates()
```

Dagster uses the `partition_expr` metadata to craft the `SELECT` statement when loading the correct partition in the downstream asset. When loading a dynamic partition, the following statement is used:

```sql
SELECT *
WHERE [partition_expr] >= [partition_start]
AND [partition_expr] < [partition_end]
```

When the `partition_expr` value is injected into this statement, the resulting SQL query must follow Snowflake's SQL syntax. Refer to the [Snowflake documentation](https://docs.snowflake.com/en/sql-reference/constructs) for more information.

When materializing the above assets, a partition must be selected, as described in [Materializing partitioned assets](/concepts/partitions-schedules-sensors/partitions#materializing-partitioned-assets). The `[partition_start]` and `[partition_end]` bounds are of the form `YYYY-MM-DD HH:MM:SS`. In this example, the query when materializing the `2023-01-02` partition of the above assets would be:

```sql
SELECT *
WHERE TO_TIMESTAMP(time::INT) >= '2023-01-02 00:00:00'
AND TO_TIMESTAMP(time::INT) < '2023-01-03 00:00:00'
```

In this example, the data in the **TIME** column are integers, so the `partition_expr` metadata includes a SQL statement to convert integers to timestamps. A full list of Snowflake functions can be found [here](https://docs.snowflake.com/en/sql-reference/functions-all).

### Storing multi-partitioned assets

The Snowflake I/O manager can also store data partitioned on multiple dimensions. To do this, you must specify the column for each partition as a dictionary of `partition_expr` metadata:

```python file=/integrations/snowflake/multi_partition.py startafter=start_example endbefore=end_example
import pandas as pd

from dagster import (
DailyPartitionsDefinition,
MultiPartitionsDefinition,
StaticPartitionDefinition,
asset,
)


@asset(
partitions_def=MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"species": StaticPartitionDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
}
),
metadata={
"partition_expr": {"date": "TO_TIMESTAMP(time::INT)", "species": "SPECIES"}
},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
partition = partition = context.partition_key.keys_by_dimension
species = partition["species"]
date = partition["date"]

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
# the time of the row as an integer of seconds since epoch
full_df = get_iris_data_for_date(date)

return full_df[full_df["Species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
return iris_dataset_partitioned.dropna().drop_duplicates()
```

Dagster uses the `partition_expr` metadata to craft the `SELECT` statement when loading the correct partition in a downstream asset. For multi-partitions, Dagster concatenates the `WHERE` statements described in the above sections to craft the correct `SELECT` statement.

When materializing the above assets, a partition must be selected, as described in [Materializing partitioned assets](/concepts/partitions-schedules-sensors/partitions#materializing-partitioned-assets). For example, when materializing the `2023-01-02|Iris-setosa` partition of the above assets, the following query will be used:

```sql
SELECT *
WHERE SPECIES = 'Iris-setosa'
AND DATE >= '2023-01-02 00:00:00'
AND DATE < '2023-01-03 00:00:00'`
```

---

## Storing tables in multiple schemas

You may want to have different assets stored in different Snowflake schemas. The Snowflake I/O manager allows you to specify the schema in several ways.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ your data warehouse.
Related Guides:

* `Using Dagster with Snowflake guide </integrations/snowflake>`_

* `Snowflake I/O manager reference </integrations/snowflake/reference>`_

.. currentmodule:: dagster_snowflake_pandas

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ your data warehouse.
Related Guides:

* `Using Dagster with Snowflake guide </integrations/snowflake>`_
* `Snowflake I/O manager reference </integrations/snowflake/reference>`_


.. currentmodule:: dagster_snowflake_pyspark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ your data warehouse.
Related Guides:

* `Using Dagster with Snowflake </integrations/snowflake>`_
* `Snowflake I/O manager reference </integrations/snowflake/reference>`_
* `Transitioning Data Pipelines from Development to Production </guides/dagster/transitioning-data-pipelines-from-development-to-production>`_
* `Testing Against Production with Dagster Cloud Branch Deployments </guides/dagster/branch_deployments>`_

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
def get_iris_data_for_date(*args, **kwargs):
pass


# start_example

import pandas as pd

from dagster import (
DailyPartitionsDefinition,
MultiPartitionsDefinition,
StaticPartitionDefinition,
asset,
)


@asset(
partitions_def=MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2023-01-01"),
"species": StaticPartitionDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
}
),
metadata={
"partition_expr": {"date": "TO_TIMESTAMP(time::INT)", "species": "SPECIES"}
},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
partition = partition = context.partition_key.keys_by_dimension
species = partition["species"]
date = partition["date"]

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
# the time of the row as an integer of seconds since epoch
full_df = get_iris_data_for_date(date)

return full_df[full_df["Species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
return iris_dataset_partitioned.dropna().drop_duplicates()


# end_example
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# start_example

import pandas as pd

from dagster import StaticPartitionDefinition, asset


@asset(
partitions_def=StaticPartitionDefinition(
["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
),
metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
species = context.asset_partition_key_for_output()

full_df = pd.read_csv(
"https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
names=[
"Sepal length (cm)",
"Sepal width (cm)",
"Petal length (cm)",
"Petal width (cm)",
"Species",
],
)

return full_df[full_df["Species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
return iris_dataset_partitioned.dropna().drop_duplicates()


# end_example
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
def get_iris_data_for_date(*args, **kwargs):
pass


# start_example

import pandas as pd

from dagster import DailyPartitionsDefinition, asset


@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
metadata={"partition_expr": "TO_TIMESTAMP(time::INT)"},
)
def iris_data_per_day(context) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
# the time of the row as an integer of seconds since epoch
return get_iris_data_for_date(partition)


@asset
def iris_cleaned(iris_data_per_day: pd.DataFrame):
return iris_data_per_day.dropna().drop_duplicates()


# end_example

1 comment on commit 532ced5

@vercel
Copy link

@vercel vercel bot commented on 532ced5 Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.