/
dagster_type.py
62 lines (48 loc) · 2 KB
/
dagster_type.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import typing
from dagster import check
from dagster.core.errors import DagsterInvalidDefinitionError
from .builtin_enum import BuiltinEnum
from .field_utils import Dict
from .typing_api import (
is_closed_python_dict_type,
is_closed_python_set_type,
is_closed_python_tuple_type,
)
from .wrapping import WrappingType
MAGIC_RUNTIME_TYPE_NAME = '__runtime_type'
def is_runtime_type_decorated_klass(klass):
check.type_param(klass, 'klass')
return hasattr(klass, MAGIC_RUNTIME_TYPE_NAME)
OPEN_CONTAINER_TYPES = {dict, Dict, typing.Dict, tuple, typing.Tuple, set, typing.Set}
def check_dagster_type_param(dagster_type, param_name, base_type):
# Cannot check base_type because of circular references and no fwd declarations
if dagster_type is None:
return dagster_type
if dagster_type in OPEN_CONTAINER_TYPES:
return dagster_type
if is_closed_python_dict_type(dagster_type):
return dagster_type
if is_closed_python_tuple_type(dagster_type):
return dagster_type
if is_closed_python_set_type(dagster_type):
return dagster_type
if BuiltinEnum.contains(dagster_type):
return dagster_type
if isinstance(dagster_type, WrappingType):
return dagster_type
if not isinstance(dagster_type, type):
raise DagsterInvalidDefinitionError(
'Invalid type for "{param_name}": dagster_type is not an instance of "type", got {dagster_type}'.format(
param_name=param_name, dagster_type=dagster_type
)
)
if is_runtime_type_decorated_klass(dagster_type):
return dagster_type
if not issubclass(dagster_type, base_type):
raise DagsterInvalidDefinitionError(
(
'Parameter {param_name} must be a valid dagster type: A builtin (e.g. String, Int, '
'etc), a wrapping type (List or Optional), or a type class. Got {dagster_type}'
).format(param_name=param_name, dagster_type=dagster_type)
)
return dagster_type