Skip to content

Commit

Permalink
[duckdb-pyspark] add duckdb_pyspark_io_manager helper (#10894)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 6, 2022
1 parent a5758f2 commit 493cd5c
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ This library provides an integration with the `DuckDB <hhttps://duckdb.org/>`_ d

.. currentmodule:: dagster_duckdb_pyspark

.. autoconfigurable:: duckdb_pyspark_io_manager
:annotation: IOManagerDefinition

.. autoclass:: DuckDBPySparkTypeHandler
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .duckdb_pyspark_type_handler import DuckDBPySparkTypeHandler as DuckDBPySparkTypeHandler
from .duckdb_pyspark_type_handler import duckdb_pyspark_io_manager as duckdb_pyspark_io_manager
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pyspark
from dagster_duckdb.io_manager import DuckDbClient, _connect_duckdb
from dagster_duckdb.io_manager import DuckDbClient, _connect_duckdb, build_duckdb_io_manager
from pyspark.sql import SparkSession

from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema
Expand Down Expand Up @@ -74,3 +74,58 @@ def load_input(self, context: InputContext, table_slice: TableSlice) -> pyspark.
@property
def supported_types(self):
return [pyspark.sql.DataFrame]


duckdb_pyspark_io_manager = build_duckdb_io_manager([DuckDBPySparkTypeHandler()])
duckdb_pyspark_io_manager.__doc__ = """
An IO manager definition that reads inputs from and writes PySpark DataFrames to DuckDB.
Returns:
IOManagerDefinition
Examples:
.. code-block:: python
from dagster_duckdb_pyspark import duckdb_pyspark_io_manager
@asset(
key_prefix=["my_schema"] # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
...
@repository
def my_repo():
return with_resources(
[my_table],
{"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb"})}
)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using
the IO Manager. For assets, the schema will be determined from the asset key.
For ops, the schema can be specified by including a "schema" entry in output metadata. If "schema" is not provided
via config or on the asset/op, "public" will be used for the schema.
.. code-block:: python
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
# the returned value will be stored at my_schema.my_table
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the
In or AssetIn.
.. code-block:: python
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
# my_table will just contain the data from column "a"
...
"""
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import duckdb
import pandas as pd
import pytest
from dagster_duckdb.io_manager import build_duckdb_io_manager
from dagster_duckdb_pyspark import DuckDBPySparkTypeHandler
from dagster_duckdb_pyspark import duckdb_pyspark_io_manager
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import SparkSession

Expand All @@ -30,9 +29,8 @@ def make_df():


def test_duckdb_io_manager_with_ops(tmp_path):
duckdb_io_manager = build_duckdb_io_manager([DuckDBPySparkTypeHandler()])
resource_defs = {
"io_manager": duckdb_io_manager.configured(
"io_manager": duckdb_pyspark_io_manager.configured(
{"database": os.path.join(tmp_path, "unit_test.duckdb")}
),
}
Expand Down Expand Up @@ -68,9 +66,8 @@ def b_plus_one(b_df: SparkDF) -> SparkDF:


def test_duckdb_io_manager_with_assets(tmp_path):
duckdb_io_manager = build_duckdb_io_manager([DuckDBPySparkTypeHandler()])
resource_defs = {
"io_manager": duckdb_io_manager.configured(
"io_manager": duckdb_pyspark_io_manager.configured(
{"database": os.path.join(tmp_path, "unit_test.duckdb")}
),
}
Expand Down Expand Up @@ -102,9 +99,8 @@ def not_supported():


def test_not_supported_type(tmp_path):
duckdb_io_manager = build_duckdb_io_manager([DuckDBPySparkTypeHandler()])
resource_defs = {
"io_manager": duckdb_io_manager.configured(
"io_manager": duckdb_pyspark_io_manager.configured(
{"database": os.path.join(tmp_path, "unit_test.duckdb")}
),
}
Expand Down Expand Up @@ -140,10 +136,11 @@ def daily_partitioned(context) -> SparkDF:


def test_partitioned_asset(tmp_path):
duckdb_io_manager = build_duckdb_io_manager([DuckDBPySparkTypeHandler()]).configured(
{"database": os.path.join(tmp_path, "unit_test.duckdb")}
)
resource_defs = {"io_manager": duckdb_io_manager}
resource_defs = {
"io_manager": duckdb_pyspark_io_manager.configured(
{"database": os.path.join(tmp_path, "unit_test.duckdb")}
)
}

materialize(
[daily_partitioned],
Expand Down

1 comment on commit 493cd5c

@vercel
Copy link

@vercel vercel bot commented on 493cd5c Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.