Skip to content

Commit

Permalink
Snowflake IO Manager API docs (#10175)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 1, 2022
1 parent fc6ea04 commit b4c0930
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
13 changes: 10 additions & 3 deletions docs/sphinx/_ext/autodoc_dagster.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import json
import textwrap
from typing import Any, List, Tuple, Type, Union, cast
Expand Down Expand Up @@ -133,12 +134,18 @@ def add_content(self, more_content) -> None:
self.add_line("|", source_name)
self.add_line("", source_name)

obj = cast(Union[ConfigurableDefinition, Type[ConfigurableClass]], self.object)
if inspect.isfunction(self.object):
# self.object is a function that returns a configurable class eg build_snowflake_io_manager
obj = self.object([])
else:
obj = self.object

obj = cast(Union[ConfigurableDefinition, Type[ConfigurableClass]], obj)
config_field = None
if isinstance(obj, ConfigurableDefinition):
config_field = check.not_none(self.object.config_schema).as_field()
config_field = check.not_none(obj.config_schema).as_field()
elif isinstance(obj, type) and issubclass(obj, ConfigurableClass):
config_field = Field(self.object.config_type())
config_field = Field(obj.config_type())

for line in config_field_to_lines(config_field):
self.add_line(line, source_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ your data warehouse.
.. autoconfigurable:: snowflake_resource
:annotation: ResourceDefinition

.. autofunction:: build_snowflake_io_manager
.. autoconfigurable:: build_snowflake_io_manager
:annotation: IOManagerDefinition

.. autofunction:: snowflake_op_for_query

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,67 @@ def build_snowflake_io_manager(type_handlers: Sequence[DbTypeHandler]) -> IOMana
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
@asset(
key_prefix=["my_schema"] # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])
@repository
def my_repo():
return with_resources(
[my_table],
{"io_manager": snowflake_io_manager.configured({
"database": "my_database",
"account" : {"env": "SNOWFLAKE_ACCOUNT"}
...
})}
)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using
the IO Manager. For assets, the schema will be determined from the asset key.
For ops, the schema can be specified by including a "schema" entry in output metadata. If "schema" is not provided
via config or on the asset/op, "public" will be used for the schema.
.. code-block:: python
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
# the returned value will be stored at my_schema.my_table
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the
In or AssetIn.
@job(resource_defs={'io_manager': snowflake_io_manager})
def my_job():
...
.. code-block:: python
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
"""

@io_manager(
config_schema={
"database": StringSource,
"account": StringSource,
"user": StringSource,
"password": StringSource,
"warehouse": Field(StringSource, is_required=False),
"schema": Field(StringSource, is_required=False),
"database": Field(StringSource, description="Name of the database to use."),
"account": Field(
StringSource,
description="Your Snowflake account name. For more details, see https://bit.ly/2FBL320.",
),
"user": Field(StringSource, description="User login name."),
"password": Field(StringSource, description="User password."),
"warehouse": Field(
StringSource, description="Name of the warehouse to use.", is_required=False
),
"schema": Field(
StringSource, description="Name of the schema to use", is_required=False
),
}
)
def snowflake_io_manager():
Expand Down

1 comment on commit b4c0930

@vercel
Copy link

@vercel vercel bot commented on b4c0930 Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.