Skip to content

Commit

Permalink
Direct invocation of assets for unit testing (#6761)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Feb 25, 2022
1 parent ee07c9c commit ef3ad62
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 2 deletions.
66 changes: 66 additions & 0 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,61 @@ Multiple jobs within the same repository can target overlapping sets of assets.

Like regular jobs, asset jobs can be placed on [schedules](/concepts/partitions-schedules-sensors/schedules) and [sensors](/concepts/partitions-schedules-sensors/sensors).

## Testing

When writing unit tests, you can treat the function decorated by `@asset` as a regular python function.

Consider a simple asset with no upstream dependencies:

```python file=/concepts/assets/asset_testing.py startafter=start_simple_asset endbefore=end_simple_asset
@asset
def my_simple_asset():
return [1, 2, 3]
```

When writing a unit test, you can directly invoke the decorated function.

```python file=/concepts/assets/asset_testing.py startafter=start_test_simple_asset endbefore=end_test_simple_asset
def test_my_simple_asset():
result = my_simple_asset()
assert result == [1, 2, 3]
```

If you have an asset with upstream dependencies:

```python file=/concepts/assets/asset_testing.py startafter=start_more_complex_asset endbefore=end_more_complex_asset
@asset
def more_complex_asset(my_simple_asset):
return my_simple_asset + [4, 5, 6]
```

You can manually provide values for those dependencies in your unit test. This allows you to test assets in isolation from one another.

```python file=/concepts/assets/asset_testing.py startafter=start_test_more_complex_asset endbefore=end_test_more_complex_asset
def test_more_complex_asset():
result = more_complex_asset([0])
assert result == [0, 4, 5, 6]
```

If you use a context object in your function, `@asset` will provide the correct context during execution. When writing a unit test, you can mock it with <PyObject object="build_op_context" />. You can use <PyObject object="build_op_context" /> to generate the `context` object because under the hood the function decorated by `@asset` is an op.

Consider this asset that uses a resource:

```python file=/concepts/assets/asset_testing.py startafter=start_with_context_asset endbefore=end_with_context_asset
@asset
def uses_context(context):
return context.resources.foo
```

When writing a unit test, use <PyObject object="build_op_context" /> to mock the `context` and provide values for testing.

```python file=/concepts/assets/asset_testing.py startafter=start_test_with_context_asset endbefore=end_test_with_context_asset
def test_uses_context():
context = build_op_context(resources={"foo": "bar"})
result = uses_context(context)
assert result == "bar"
```

## Examples

### Multi-component asset keys
Expand Down Expand Up @@ -334,3 +389,14 @@ def downstream_asset(upstream):
```

In this case, `ins={"upstream": AssetIn("upstream_asset")}` declares that the contents of the asset with the key `upstream_asset` will be provided to the function argument named `upstream`.

### Using context in assets

Since a software-defined asset contains an op, all of the typical functionality of an op (like the use of resources) is available to an asset.

```python file=/concepts/assets/asset_w_context.py startafter=start_w_context endbefore=end_w_context
@asset(required_resource_keys={"api"})
def my_asset(context):
# fetches contents of an asset
return context.resources.api.fetch_table("my_asset")
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# pylint: disable=redefined-outer-name
from dagster import asset, build_op_context

# start_simple_asset


@asset
def my_simple_asset():
return [1, 2, 3]


# end_simple_asset

# start_test_simple_asset


def test_my_simple_asset():
result = my_simple_asset()
assert result == [1, 2, 3]


# end_test_simple_asset

# start_more_complex_asset


@asset
def more_complex_asset(my_simple_asset):
return my_simple_asset + [4, 5, 6]


# end_more_complex_asset

# start_test_more_complex_asset


def test_more_complex_asset():
result = more_complex_asset([0])
assert result == [0, 4, 5, 6]


# end_test_more_complex_asset

# start_with_context_asset


@asset
def uses_context(context):
return context.resources.foo


# end_with_context_asset

# start_test_with_context_asset


def test_uses_context():
context = build_op_context(resources={"foo": "bar"})
result = uses_context(context)
assert result == "bar"


# end_test_with_context_asset
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster import asset

# start_w_context


@asset(required_resource_keys={"api"})
def my_asset(context):
# fetches contents of an asset
return context.resources.api.fetch_table("my_asset")


# end_w_context
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/core/asset_defs/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def __init__(
self._partitions_def = partitions_def
self._partition_mappings = partition_mappings or {}

def __call__(self, *args, **kwargs):
return self._op(*args, **kwargs)

@property
def op(self) -> OpDefinition:
return self._op
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import pytest
from dagster import AssetKey, DagsterInvalidDefinitionError, Out, Output, String, check
from dagster.core.asset_defs import AssetIn, AssetsDefinition, asset, multi_asset
from dagster import (
AssetKey,
DagsterInvalidDefinitionError,
OpExecutionContext,
Out,
Output,
String,
build_op_context,
check,
)
from dagster.core.asset_defs import AssetIn, AssetsDefinition, asset, build_assets_job, multi_asset
from dagster.core.asset_defs.decorators import ASSET_DEPENDENCY_METADATA_KEY


Expand Down Expand Up @@ -245,3 +254,57 @@ def my_asset(_input1: str):
pass

assert my_asset.op.input_defs[0].dagster_type.display_name == "String"


def test_invoking_simple_assets():
@asset
def no_input_asset():
return [1, 2, 3]

out = no_input_asset()
assert out == [1, 2, 3]

@asset
def arg_input_asset(arg1, arg2):
return arg1 + arg2

out = arg_input_asset([1, 2, 3], [4, 5, 6])
assert out == [1, 2, 3, 4, 5, 6]

@asset
def arg_kwarg_asset(arg1, kwarg1=[0]):
return arg1 + kwarg1

out = arg_kwarg_asset([1, 2, 3], kwarg1=[3, 2, 1])
assert out == [1, 2, 3, 3, 2, 1]

out = arg_kwarg_asset(([1, 2, 3]))
assert out == [1, 2, 3, 0]


def test_invoking_asset_with_deps():
@asset
def upstream():
return [1]

@asset
def downstream(upstream):
return upstream + [2, 3]

# check that the asset dependencies are in place
job = build_assets_job("foo", [upstream, downstream])
assert job.execute_in_process().success

out = downstream([3])
assert out == [3, 2, 3]


def test_invoking_asset_with_context():
@asset
def asset_with_context(context, arg1):
assert isinstance(context, OpExecutionContext)
return arg1

ctx = build_op_context()
out = asset_with_context(ctx, 1)
assert out == 1

1 comment on commit ef3ad62

@vercel
Copy link

@vercel vercel bot commented on ef3ad62 Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.