Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/dagster-io/dagster into r…
Browse files Browse the repository at this point in the history
…elease-0.15.0
  • Loading branch information
clairelin135 committed Jun 14, 2022
2 parents a7e75eb + c97ddaa commit b5106af
Show file tree
Hide file tree
Showing 18 changed files with 212 additions and 85 deletions.
28 changes: 23 additions & 5 deletions docs/content/concepts/repositories-workspaces/repositories.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: A repository is a collection of jobs, schedules, and sensor definit

# Repositories

A repository is a collection of jobs, schedules, and sensor definitions that the Dagster CLI, Dagit and the Dagster Daemon can target to load them.
A repository is a collection of software-defined assets, jobs, schedules, and sensors. Repositories are loaded as a unit by the Dagster CLI, Dagit and the Dagster Daemon.

## Relevant APIs

Expand All @@ -18,7 +18,7 @@ A repository is a collection of jobs, schedules, and sensor definitions that the

A repository is a convenient way to organize your job and other definitions. Each repository:

- Includes various definitions: [Jobs](/concepts/ops-jobs-graphs/jobs-graphs), [Schedules](/concepts/partitions-schedules-sensors/schedules), [Sensors](/concepts/partitions-schedules-sensors/sensors).
- Includes various definitions: [Software-defined assets](/concepts/assets/software-defined-assets), [Jobs](/concepts/ops-jobs-graphs/jobs-graphs), [Schedules](/concepts/partitions-schedules-sensors/schedules), and [Sensors](/concepts/partitions-schedules-sensors/sensors).
- Is loaded in a different process than Dagster system processes like Dagit. Any communication between the Dagster system and repository code occurs over an RPC mechanism, ensuring that problems in repository code can't affect Dagster or other repositories.
- Can be loaded in its own Python environment, so you can manage your dependencies (or even your own Python versions) separately.

Expand All @@ -31,7 +31,22 @@ You can set up multiple repositories and load them all at once by creating a `wo
Repositories are typically declared using the <PyObject object="repository" decorator /> decorator. For example:

```python file=/concepts/repositories_workspaces/repository_definition.py
from dagster import RunRequest, ScheduleDefinition, job, op, repository, sensor
from dagster import RunRequest, ScheduleDefinition, asset, job, op, repository, sensor


@asset
def asset1():
pass


@asset
def asset2():
pass


@asset(group_name="mygroup")
def asset3():
pass


@op
Expand Down Expand Up @@ -67,13 +82,16 @@ def job2_sensor():
@repository
def my_repository():
return [
asset1,
asset2,
asset3,
job1_schedule,
job2_sensor,
job3,
]
```

The repository specifies a list of items, each of which can be a <PyObject object="JobDefinition"/>, <PyObject module="dagster" object="ScheduleDefinition" />, or <PyObject module="dagster" object="SensorDefinition" />. If you include a schedule or sensor, the job it targets will be automatically also included on the repository.
The repository specifies a list of items, each of which can be a <PyObject object="AssetsDefinition"/>, <PyObject object="JobDefinition"/>, <PyObject module="dagster" object="ScheduleDefinition" />, or <PyObject module="dagster" object="SensorDefinition" />. If you include a schedule or sensor, the job it targets will be automatically also included on the repository.

## Using a Repository

Expand All @@ -83,7 +101,7 @@ If you save the code above as `repo.py`, you can then run the Dagster command li
dagit -f repo.py
```

Now you can see that all jobs in this repository are on the left:
Now you can see that all the assets and jobs in this repository are listed in the left sidebar. Assets are organized in groups. In our example, `asset1` and `asset2` are placed in the `default` group because they were not explicitly assigned a group. `asset3` is in `mygroup`.

<Image
alt="repo-dagit"
Expand Down
Binary file modified docs/next/public/images/concepts/repo-dagit.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/screenshot_capture/screenshots.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
url: http://127.0.0.1:3000/
width: 1536
height: 1152
steps:
- open left sidebar

- path: concepts/assets/software-defined-assets/catalog.png
defs_file: examples/docs_snippets/docs_snippets/concepts/assets/asset_group.py
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
from dagster import RunRequest, ScheduleDefinition, job, op, repository, sensor
from dagster import RunRequest, ScheduleDefinition, asset, job, op, repository, sensor


@asset
def asset1():
pass


@asset
def asset2():
pass


@asset(group_name="mygroup")
def asset3():
pass


@op
Expand Down Expand Up @@ -34,6 +49,9 @@ def job2_sensor():
@repository
def my_repository():
return [
asset1,
asset2,
asset3,
job1_schedule,
job2_sensor,
job3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def test_jobs():

def test_my_repository():
assert my_repository
assert len(my_repository.get_all_pipelines()) == 3
assert len(my_repository.get_all_pipelines()) == 4
assert len(my_repository.schedule_defs) == 1
assert len(my_repository.sensor_defs) == 1
assert len(my_repository.get_all_jobs()) == 3
assert len(my_repository.get_all_jobs()) == 4


def test_my_lazy_repository():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from .pandas_io_manager import pandas_io_manager

airbyte_assets = build_airbyte_assets(
connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"]
connection_id=AIRBYTE_CONNECTION_ID,
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)

dbt_assets = load_assets_from_dbt_project(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dagster._check as check
from dagster.core.asset_defs.assets import AssetsDefinition
from dagster.core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster.core.errors import DagsterInvalidSubsetError
from dagster.core.selector.subset_selector import (
fetch_connected,
generate_asset_dep_graph,
Expand Down Expand Up @@ -132,7 +133,15 @@ def _resolve(self, node: AssetSelection) -> AbstractSet[str]:
[_match_groups(assets_def, set(node.children)) for assets_def in self.all_assets],
)
elif isinstance(node, KeysAssetSelection):
return set([child.to_user_string() for child in node.children])
specified_keys = set([child.to_user_string() for child in node.children])
invalid_keys = specified_keys - set(self.all_assets_by_name.keys())
if invalid_keys:
raise DagsterInvalidSubsetError(
f"AssetKey(s) {invalid_keys} were selected, but no AssetDefinition objects supply "
"these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions "
"are correctly added to the repository."
)
return specified_keys
elif isinstance(node, OrAssetSelection):
child_1, child_2 = [self._resolve(child) for child in node.children]
return child_1 | child_2
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/core/asset_defs/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin
can_subset=self.can_subset,
selected_asset_keys=selected_asset_keys & self.keys,
resource_defs=self.resource_defs,
group_names_by_key=self.group_names_by_key,
)

def to_source_assets(self) -> Sequence[SourceAsset]:
Expand Down Expand Up @@ -468,7 +469,7 @@ def with_resources(self, resource_defs: Mapping[str, ResourceDefinition]) -> "As
selected_asset_keys=self._selected_asset_keys,
can_subset=self._can_subset,
resource_defs=relevant_resource_defs,
group_names_by_key=self._group_names_by_key,
group_names_by_key=self.group_names_by_key,
)


Expand Down
19 changes: 16 additions & 3 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def asset(
metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op.
io_manager_key (Optional[str]): The resource key of the IOManager used
for storing the output of the op as an asset, and for loading it in downstream ops (default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided.
for storing the output of the op as an asset, and for loading it in downstream ops
(default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided.
io_manager_def (Optional[IOManagerDefinition]): The definition of the IOManager used for
storing the output of the op as an asset, and for loading it in
downstream ops. Only one of io_manager_def and io_manager_key can be provided.
Expand Down Expand Up @@ -157,7 +158,15 @@ def my_asset(my_upstream_asset: int) -> int:
return _Asset()(name)

key_prefix = canonicalize_backcompat_args(
key_prefix, "key_prefix", namespace, "namespace", "0.16.0"
key_prefix,
"key_prefix",
namespace,
"namespace",
"0.16.0",
additional_warn_txt="key_prefix applies only to the output AssetKey. If you want to modify "
"the prefix of the input AssetKeys as well, you can do this by explicitly setting the ins "
"parameter of this asset to a dictionary of the form "
"'input_name': AssetIn(key_prefix=...).",
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
Expand All @@ -168,6 +177,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return _Asset(
name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str])
key_prefix=key_prefix,
namespace=namespace,
ins=ins,
non_argument_deps=_make_asset_keys(non_argument_deps),
metadata=metadata,
Expand All @@ -191,6 +201,7 @@ class _Asset:
def __init__(
self,
name: Optional[str] = None,
namespace: Optional[Sequence[str]] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
non_argument_deps: Optional[Set[AssetKey]] = None,
Expand All @@ -209,6 +220,7 @@ def __init__(
):
self.name = name

self.namespace = namespace
if isinstance(key_prefix, str):
key_prefix = [key_prefix]
self.key_prefix = key_prefix
Expand Down Expand Up @@ -236,7 +248,8 @@ def __init__(
def __call__(self, fn: Callable) -> AssetsDefinition:
asset_name = self.name or fn.__name__

asset_ins = build_asset_ins(fn, self.key_prefix, self.ins or {}, self.non_argument_deps)
# for backcompat, we prefix input asset keys with the namespace
asset_ins = build_asset_ins(fn, self.namespace, self.ins or {}, self.non_argument_deps)

out_asset_key = AssetKey(list(filter(None, [*(self.key_prefix or []), asset_name])))
with warnings.catch_warnings():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,14 +723,6 @@ def _subset_assets_defs(
"asset keys produced by this asset."
)

missed_keys = selected_asset_keys - included_keys - {sa.key for sa in source_assets}
if missed_keys:
raise DagsterInvalidSubsetError(
f"When building job, the AssetKey(s) {[key.to_user_string() for key in missed_keys]} "
"were selected, but are not produced by any of the provided AssetsDefinitions or "
"SourceAssets. Make sure that keys are spelled correctly and that all of the expected "
"definitions are provided."
)
all_excluded_assets: Sequence[Union["AssetsDefinition", "SourceAsset"]] = [
*excluded_assets,
*source_assets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from dagster import (
AssetKey,
AssetOut,
AssetsDefinition,
IOManager,
Out,
Output,
ResourceDefinition,
build_op_context,
io_manager,
op,
)
from dagster._check import CheckError
from dagster.core.asset_defs import AssetGroup, AssetIn, SourceAsset, asset, multi_asset
Expand Down Expand Up @@ -92,6 +95,23 @@ def bar():
assert replaced.group_names_by_key[AssetKey("baz")] == "foo"


def test_retain_group_subset():
@op(out={"a": Out(), "b": Out()})
def ma_op():
return 1

ma = AssetsDefinition(
node_def=ma_op,
keys_by_input_name={},
keys_by_output_name={"a": AssetKey("a"), "b": AssetKey("b")},
group_names_by_key={AssetKey("a"): "foo", AssetKey("b"): "bar"},
can_subset=True,
)

subset = ma.subset_for({AssetKey("b")})
assert subset.group_names_by_key[AssetKey("b")] == "bar"


def test_chain_replace_and_subset_for():
@multi_asset(
outs={"a": AssetOut(), "b": AssetOut(), "c": AssetOut()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ def my_asset(arg1):
assert isinstance(my_asset, AssetsDefinition)
assert len(my_asset.op.output_defs) == 1
assert len(my_asset.op.input_defs) == 1
assert AssetKey(["my_prefix", "arg1"]) in my_asset.keys_by_input_name.values()
# this functions differently than the namespace arg in this scenario
assert AssetKey(["my_prefix", "arg1"]) not in my_asset.keys_by_input_name.values()
assert AssetKey(["arg1"]) in my_asset.keys_by_input_name.values()


def test_asset_with_context_arg():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from dagster._check import CheckError
from dagster.core.asset_defs import asset, multi_asset
from dagster.core.asset_defs.load_assets_from_modules import prefix_assets
from dagster.core.errors import DagsterInvalidSubsetError
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError
from dagster.core.execution.with_resources import with_resources
from dagster.core.test_utils import instance_for_test

Expand Down Expand Up @@ -173,22 +173,22 @@ def final(a, d):
(
"x",
False,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(
"x",
True,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(
["start", "x"],
False,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(
["start", "x"],
True,
(DagsterInvalidSubsetError, r"When building job, the AssetKey\(s\) \['x'\]"),
(DagsterInvalidSubsetError, r"AssetKey\(s\) {'x'} were selected"),
),
(["d", "e", "f"], False, None),
(["d", "e", "f"], True, None),
Expand Down Expand Up @@ -413,7 +413,7 @@ def a(source):
def b(a):
return a + 1

with pytest.raises(DagsterInvalidSubsetError, match="SourceAsset"):
with pytest.raises(DagsterInvalidDefinitionError, match="sources"):
define_asset_job("job", selection="*b").resolve(assets=[a, b], source_assets=[])


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ def _fails():

def test_bad_resolve():

with pytest.raises(DagsterInvalidSubsetError, match="When building job"):
with pytest.raises(DagsterInvalidSubsetError, match=r"AssetKey\(s\) {'foo'} were selected"):

@repository
def _fails():
Expand Down

0 comments on commit b5106af

Please sign in to comment.