Skip to content

Commit

Permalink
Workaround for mypy cache bug (#7732)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed May 4, 2022
1 parent 69c57d3 commit 97ed20f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import namedtuple
from functools import update_wrapper
from typing import (
TYPE_CHECKING,
Expand All @@ -7,8 +6,6 @@
Callable,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Union,
cast,
Expand All @@ -19,11 +16,7 @@
from dagster.core.decorator_utils import format_docstring_for_description
from dagster.core.definitions.config import is_callable_valid_config_arg
from dagster.core.definitions.configurable import AnonymousConfigurableDefinition
from dagster.core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterUnknownResourceError,
)
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster.seven import funcsigs
from dagster.utils.backcompat import experimental_arg_warning

Expand All @@ -39,6 +32,13 @@
)
from .resource_invocation import resource_invocation_result

# pylint: disable=unused-import
from .scoped_resources_builder import ( # type: ignore
IContainsGenerator,
Resources,
ScopedResourcesBuilder,
)

if TYPE_CHECKING:
from dagster.core.execution.resources_init import InitResourceContext

Expand Down Expand Up @@ -325,94 +325,6 @@ def _wrap(resource_fn: Callable[["InitResourceContext"], Any]) -> "ResourceDefin
return _wrap


class Resources:
"""This class functions as a "tag" that we can use to type the namedtuple returned by
ScopedResourcesBuilder.build(). The way that we create the namedtuple returned by build() is
incompatible with type annotations on its own due to its dynamic attributes, so this tag class
provides a workaround."""


class IContainsGenerator:
"""This class adds an additional tag to indicate that the resources object has at least one
resource that has been yielded from a generator, and thus may require teardown."""


class ScopedResourcesBuilder(
NamedTuple(
"_ScopedResourcesBuilder",
[("resource_instance_dict", Mapping[str, object]), ("contains_generator", bool)],
)
):
"""There are concepts in the codebase (e.g. ops, system storage) that receive
only the resources that they have specified in required_resource_keys.
ScopedResourcesBuilder is responsible for dynamically building a class with
only those required resources and returning an instance of that class."""

def __new__(
cls,
resource_instance_dict: Optional[Mapping[str, object]] = None,
contains_generator: bool = False,
):
return super(ScopedResourcesBuilder, cls).__new__(
cls,
resource_instance_dict=check.opt_dict_param(
resource_instance_dict, "resource_instance_dict", key_type=str
),
contains_generator=contains_generator,
)

def build(self, required_resource_keys: Optional[AbstractSet[str]]) -> Resources:

"""We dynamically create a type that has the resource keys as properties, to enable dotting into
the resources from a context.
For example, given:
resources = {'foo': <some resource>, 'bar': <some other resource>}
then this will create the type Resource(namedtuple('foo bar'))
and then binds the specified resources into an instance of this object, which can be consumed
as, e.g., context.resources.foo.
"""
required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys", of_type=str
)
# it is possible that the surrounding context does NOT have the required resource keys
# because we are building a context for steps that we are not going to execute (e.g. in the
# resume/retry case, in order to generate copy intermediates events)
resource_instance_dict = {
key: self.resource_instance_dict[key]
for key in required_resource_keys
if key in self.resource_instance_dict
}

# If any of the resources are generators, add the IContainsGenerator subclass to flag that
# this is the case.
if self.contains_generator:

class _ScopedResourcesContainsGenerator(
namedtuple("_ScopedResourcesContainsGenerator", list(resource_instance_dict.keys())), # type: ignore[misc]
Resources,
IContainsGenerator,
):
def __getattr__(self, attr):
raise DagsterUnknownResourceError(attr)

return _ScopedResourcesContainsGenerator(**resource_instance_dict) # type: ignore[call-arg]

else:

class _ScopedResources(
namedtuple("_ScopedResources", list(resource_instance_dict.keys())), # type: ignore[misc]
Resources,
):
def __getattr__(self, attr):
raise DagsterUnknownResourceError(attr)

return _ScopedResources(**resource_instance_dict) # type: ignore[call-arg]


def make_values_resource(**kwargs: Any) -> ResourceDefinition:
"""A helper function that creates a ``ResourceDefinition`` to take in user-defined values.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# type: ignore

# NOTE: This file has been factored out from "resource_definition" and type-ignored due to a mypy bug
# when processing dynamically generated namedtuples. This code corrupts the mypy cache and causes
# mypy to fail. When mypy fixes this bug, the type-ignore can be removed and the file contents
# folded back into "resource_definition".
#
# See: https://github.com/python/mypy/issues/7281

from collections import namedtuple
from typing import AbstractSet, Mapping, NamedTuple, Optional

import dagster.check as check
from dagster.core.errors import DagsterUnknownResourceError


class IContainsGenerator:
"""This class adds an additional tag to indicate that the resources object has at least one
resource that has been yielded from a generator, and thus may require teardown."""


class Resources:
"""This class functions as a "tag" that we can use to type the namedtuple returned by
ScopedResourcesBuilder.build(). The way that we create the namedtuple returned by build() is
incompatible with type annotations on its own due to its dynamic attributes, so this tag class
provides a workaround."""


class ScopedResourcesBuilder(
NamedTuple(
"_ScopedResourcesBuilder",
[("resource_instance_dict", Mapping[str, object]), ("contains_generator", bool)],
)
):
"""There are concepts in the codebase (e.g. ops, system storage) that receive
only the resources that they have specified in required_resource_keys.
ScopedResourcesBuilder is responsible for dynamically building a class with
only those required resources and returning an instance of that class."""

def __new__(
cls,
resource_instance_dict: Optional[Mapping[str, object]] = None,
contains_generator: bool = False,
):
return super(ScopedResourcesBuilder, cls).__new__(
cls,
resource_instance_dict=check.opt_dict_param(
resource_instance_dict, "resource_instance_dict", key_type=str
),
contains_generator=contains_generator,
)

def build(self, required_resource_keys: Optional[AbstractSet[str]]) -> Resources:

"""We dynamically create a type that has the resource keys as properties, to enable dotting into
the resources from a context.
For example, given:
resources = {'foo': <some resource>, 'bar': <some other resource>}
then this will create the type Resource(namedtuple('foo bar'))
and then binds the specified resources into an instance of this object, which can be consumed
as, e.g., context.resources.foo.
"""
required_resource_keys = check.opt_set_param(
required_resource_keys, "required_resource_keys", of_type=str
)
# it is possible that the surrounding context does NOT have the required resource keys
# because we are building a context for steps that we are not going to execute (e.g. in the
# resume/retry case, in order to generate copy intermediates events)
resource_instance_dict = {
key: self.resource_instance_dict[key]
for key in required_resource_keys
if key in self.resource_instance_dict
}

# If any of the resources are generators, add the IContainsGenerator subclass to flag that
# this is the case.
if self.contains_generator:

class _ScopedResourcesContainsGenerator(
namedtuple("_ScopedResourcesContainsGenerator", list(resource_instance_dict.keys())), # type: ignore[misc]
Resources,
IContainsGenerator,
):
def __getattr__(self, attr):
raise DagsterUnknownResourceError(attr)

return _ScopedResourcesContainsGenerator(**resource_instance_dict) # type: ignore[call-arg]

else:

class _ScopedResources(
namedtuple("_ScopedResources", list(resource_instance_dict.keys())), # type: ignore[misc]
Resources,
):
def __getattr__(self, attr):
raise DagsterUnknownResourceError(attr)

return _ScopedResources(**resource_instance_dict) # type: ignore[call-arg]

0 comments on commit 97ed20f

Please sign in to comment.