Skip to content

Commit

Permalink
Silence experimental warnings triggered by internal code (#6873)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Mar 14, 2022
1 parent 43cf866 commit 396da6c
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 80 deletions.
39 changes: 23 additions & 16 deletions python_modules/dagster/dagster/core/asset_defs/asset_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import pkgutil
import re
import warnings
from importlib import import_module
from types import ModuleType
from typing import (
Expand All @@ -23,6 +24,7 @@
from dagster.core.definitions.events import AssetKey
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.utils import merge_dicts
from dagster.utils.backcompat import ExperimentalWarning

from ..definitions.executor_definition import ExecutorDefinition
from ..definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -213,13 +215,15 @@ def build_job(
executor_def = check.opt_inst_param(executor_def, "executor_def", ExecutorDefinition)
description = check.opt_str_param(description, "description")

mega_job_def = build_assets_job(
name=name,
assets=self.assets,
source_assets=self.source_assets,
resource_defs=self.resource_defs,
executor_def=self.executor_def,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
mega_job_def = build_assets_job(
name=name,
assets=self.assets,
source_assets=self.source_assets,
resource_defs=self.resource_defs,
executor_def=self.executor_def,
)

if selection:
op_selection = self._parse_asset_selection(selection, job_name=name)
Expand All @@ -245,15 +249,18 @@ def build_job(
# accidentally add to the original list
excluded_assets = list(self.source_assets)

return build_assets_job(
name=name,
assets=included_assets,
source_assets=excluded_assets,
resource_defs=self.resource_defs,
executor_def=self.executor_def,
description=description,
tags=tags,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
asset_job = build_assets_job(
name=name,
assets=included_assets,
source_assets=excluded_assets,
resource_defs=self.resource_defs,
executor_def=self.executor_def,
description=description,
tags=tags,
)
return asset_job

def _parse_asset_selection(self, selection: Union[str, List[str]], job_name: str) -> List[str]:
"""Convert selection over asset keys to selection over ops"""
Expand Down
73 changes: 39 additions & 34 deletions python_modules/dagster/dagster/core/asset_defs/assets_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
import warnings
from typing import AbstractSet, Any, Dict, Mapping, Optional, Sequence, Tuple, Union, cast

from dagster import check
Expand All @@ -23,7 +24,7 @@
from dagster.core.execution.context.output import build_output_context
from dagster.core.storage.fs_asset_io_manager import fs_asset_io_manager
from dagster.core.storage.root_input_manager import RootInputManagerDefinition, root_input_manager
from dagster.utils.backcompat import experimental
from dagster.utils.backcompat import ExperimentalWarning, experimental
from dagster.utils.merger import merge_dicts

from .asset import AssetsDefinition
Expand Down Expand Up @@ -239,39 +240,43 @@ def build_root_manager(
source_asset_io_manager_keys = {
source_asset.io_manager_key for source_asset in source_assets_by_key.values()
}
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

@root_input_manager(required_resource_keys=source_asset_io_manager_keys)
def _root_manager(input_context: InputContext) -> Any:
source_asset_key = cast(AssetKey, input_context.asset_key)
source_asset = source_assets_by_key[source_asset_key]
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)

@op(out={source_asset_key.path[-1]: Out(asset_key=source_asset_key)})
def _op():
pass

resource_config = input_context.step_context.resolved_run_config.resources[
source_asset.io_manager_key
].config

output_context = build_output_context(
name=source_asset_key.path[-1],
step_key="none",
solid_def=_op,
metadata=cast(Dict[str, Any], source_asset.metadata),
resource_config=resource_config,
)
input_context_with_upstream = build_input_context(
name=input_context.name,
metadata=input_context.metadata,
config=input_context.config,
dagster_type=input_context.dagster_type,
upstream_output=output_context,
op_def=input_context.op_def,
step_context=input_context.step_context,
resource_config=resource_config,
)

@root_input_manager(required_resource_keys=source_asset_io_manager_keys)
def _root_manager(input_context: InputContext) -> Any:
source_asset_key = cast(AssetKey, input_context.asset_key)
source_asset = source_assets_by_key[source_asset_key]

@op(out={source_asset_key.path[-1]: Out(asset_key=source_asset_key)})
def _op():
pass

resource_config = input_context.step_context.resolved_run_config.resources[
source_asset.io_manager_key
].config

output_context = build_output_context(
name=source_asset_key.path[-1],
step_key="none",
solid_def=_op,
metadata=cast(Dict[str, Any], source_asset.metadata),
resource_config=resource_config,
)
input_context_with_upstream = build_input_context(
name=input_context.name,
metadata=input_context.metadata,
config=input_context.config,
dagster_type=input_context.dagster_type,
upstream_output=output_context,
op_def=input_context.op_def,
step_context=input_context.step_context,
resource_config=resource_config,
)

io_manager = getattr(cast(Any, input_context.resources), source_asset.io_manager_key)
return io_manager.load_input(input_context_with_upstream)
io_manager = getattr(cast(Any, input_context.resources), source_asset.io_manager_key)
return io_manager.load_input(input_context_with_upstream)

return _root_manager
51 changes: 28 additions & 23 deletions python_modules/dagster/dagster/core/asset_defs/decorators.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import (
AbstractSet,
Any,
Expand All @@ -24,7 +25,7 @@
from dagster.core.definitions.utils import NoValueSentinel
from dagster.core.errors import DagsterInvalidDefinitionError
from dagster.core.types.dagster_type import DagsterType
from dagster.utils.backcompat import experimental_decorator
from dagster.utils.backcompat import ExperimentalWarning, experimental_decorator

from .asset import AssetsDefinition
from .asset_in import AssetIn
Expand Down Expand Up @@ -189,20 +190,22 @@ def partition_fn(context): # pylint: disable=function-redefined
asset_partitions_def=self.partitions_def,
asset_partitions=partition_fn,
)
op = _Op(
name="__".join(out_asset_key.path),
description=self.description,
ins=asset_ins,
out=out,
required_resource_keys=self.required_resource_keys,
tags={"kind": self.compute_kind} if self.compute_kind else None,
config_schema={
"assets": {
"input_partitions": Field(dict, is_required=False),
"output_partitions": Field(dict, is_required=False),
}
},
)(fn)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
op = _Op(
name="__".join(out_asset_key.path),
description=self.description,
ins=asset_ins,
out=out,
required_resource_keys=self.required_resource_keys,
tags={"kind": self.compute_kind} if self.compute_kind else None,
config_schema={
"assets": {
"input_partitions": Field(dict, is_required=False),
"output_partitions": Field(dict, is_required=False),
}
},
)(fn)

# NOTE: we can `cast` below because we know the Ins returned by `build_asset_ins` always
# have a plain AssetKey asset key. Dynamic asset keys will be deprecated in 0.15.0, when
Expand Down Expand Up @@ -272,14 +275,16 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
asset_ins = build_asset_ins(fn, None, ins or {}, non_argument_deps)
asset_outs = build_asset_outs(op_name, outs, asset_ins, internal_asset_deps or {})

op = _Op(
name=op_name,
description=description,
ins=asset_ins,
out=asset_outs,
required_resource_keys=required_resource_keys,
tags={"kind": compute_kind} if compute_kind else None,
)(fn)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
op = _Op(
name=op_name,
description=description,
ins=asset_ins,
out=asset_outs,
required_resource_keys=required_resource_keys,
tags={"kind": compute_kind} if compute_kind else None,
)(fn)

# NOTE: we can `cast` below because we know the Ins returned by `build_asset_ins` always
# have a plain AssetKey asset key. Dynamic asset keys will be deprecated in 0.15.0, when
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from abc import ABC, abstractmethod
from types import FunctionType
from typing import (
Expand All @@ -19,6 +20,7 @@
from dagster.core.asset_defs.source_asset import SourceAsset
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster.utils import merge_dicts
from dagster.utils.backcompat import ExperimentalWarning

from .events import AssetKey
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
Expand Down Expand Up @@ -692,17 +694,20 @@ def from_list(

elif isinstance(definition, AssetGroup):
asset_group = definition

if asset_group.all_assets_job_name() in pipelines_or_jobs:
raise DagsterInvalidDefinitionError(
"When constructing repository, attempted to pass multiple AssetGroups. There can only be one AssetGroup per repository."
)
pipelines_or_jobs[asset_group.all_assets_job_name()] = build_assets_job(
asset_group.all_assets_job_name(),
assets=asset_group.assets,
source_assets=asset_group.source_assets,
resource_defs=asset_group.resource_defs,
executor_def=asset_group.executor_def,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ExperimentalWarning)
pipelines_or_jobs[asset_group.all_assets_job_name()] = build_assets_job(
asset_group.all_assets_job_name(),
assets=asset_group.assets,
source_assets=asset_group.source_assets,
resource_defs=asset_group.resource_defs,
executor_def=asset_group.executor_def,
)
source_assets = {
source_asset.key: source_asset for source_asset in asset_group.source_assets
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import warnings

import pytest

from dagster import (
Expand All @@ -16,6 +18,20 @@
from dagster.core.asset_defs import AssetGroup, AssetIn, SourceAsset, asset, multi_asset


@pytest.fixture(autouse=True)
def check_experimental_warnings():
with warnings.catch_warnings(record=True) as record:
yield

raises_warning = False
for w in record:
if "build_assets_job" in w.message.args[0] or "root_input_manager" in w.message.args[0]:
raises_warning = True
break

assert not raises_warning


def test_asset_group_from_list():
@asset
def asset_foo():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import warnings

import pytest

from dagster import (
Expand All @@ -12,6 +14,21 @@
)
from dagster.core.asset_defs import AssetIn, AssetsDefinition, asset, build_assets_job, multi_asset
from dagster.core.asset_defs.decorators import ASSET_DEPENDENCY_METADATA_KEY
from dagster.utils.backcompat import ExperimentalWarning


@pytest.fixture(autouse=True)
def check_experimental_warnings():
with warnings.catch_warnings(record=True) as record:
yield

raises_warning = False
for w in record:
if "asset_key" in w.message.args[0]:
raises_warning = True
break

assert not raises_warning


def test_asset_no_decorator_args():
Expand Down

0 comments on commit 396da6c

Please sign in to comment.