Skip to content

Commit

Permalink
Add support for Unions
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Avanov committed Jun 28, 2018
1 parent 0f5b11f commit 1e096ca
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 113 deletions.
30 changes: 29 additions & 1 deletion tests/test_parser.py
@@ -1,6 +1,6 @@
import json
from enum import Enum
from typing import NamedTuple, Dict, Any, Sequence
from typing import NamedTuple, Dict, Any, Sequence, Union

import colander
import pytest
Expand Down Expand Up @@ -72,6 +72,34 @@ class X(NamedTuple):
assert x.x == x.y['x']


def test_type_with_unions():
class VariantA(NamedTuple):
variant_a: int

class VariantB(NamedTuple):
variant_b: int
variant_b_attr: int

class X(NamedTuple):
x: Union[None, VariantA, VariantB]

MkX = p.type_constructor(X)

x: X = MkX({'x': {'variant_a': 1}})
assert isinstance(x.x, VariantA)

x: X = MkX({'x': {'variant_b': 1, 'variant_b_attr': 1}})
assert isinstance(x.x, VariantB)

assert MkX({'x': None}) == MkX({})
with pytest.raises(colander.Invalid):
# this is not the same as MkX({}),
# the empty structure is passed as attribute x,
# which should match with only an empty named tuple definition,
# which is not the same as None.
MkX({'x': {}})


def test_parser_github_pull_request_payload():
data = GITHUB_PR_PAYLOAD_JSON
github_pr_dict = json.loads(data)
Expand Down
145 changes: 33 additions & 112 deletions typeit/parser.py
@@ -1,4 +1,3 @@
import re
import enum as std_enum
from typing import (
Type, Tuple, Optional, Any, Union, List,
Expand All @@ -9,6 +8,9 @@
import colander as col
import typing_inspect as insp

from .utils import normalize_name, denormalize_name
from . import schema


def typeit(dictionary: Dict):
return construct_type('main', parse(dictionary))
Expand Down Expand Up @@ -142,44 +144,6 @@ def clarify_field_type_list(field_name: str,
}


NORMALIZATION_PREFIX = 'normalized__'

RESERVED_WORDS = {
'and', 'del', 'from',
'not', 'while','as',
'elif', 'global', 'or',
'with','assert', 'else',
'if', 'pass', 'yield',
'break', 'except', 'import',
'print', 'class', 'exec',
'in', 'raise', 'continue',
'finally', 'is', 'return',
'def', 'for', 'lambda', 'try',
}

NORMALIZED_RESERVED_WORDS = {
f'{NORMALIZATION_PREFIX}{x}' for x in RESERVED_WORDS
}


def normalize_name(name: str,
pattern=re.compile('^([_0-9]+).*$')) -> Tuple[str, bool]:
""" Some field name patterns are not allowed in NamedTuples
https://docs.python.org/3.7/library/collections.html#collections.namedtuple
"""
if name in RESERVED_WORDS or pattern.match(name):
return f'{NORMALIZATION_PREFIX}{name}', True
return name, False


def denormalize_name(name: str) -> Tuple[str, bool]:
""" Undo normalize_name()
"""
if name in NORMALIZED_RESERVED_WORDS or name.startswith(NORMALIZATION_PREFIX):
return name[len(NORMALIZATION_PREFIX):], True
return name, False


def construct_type(name: str, fields: List[Component]) -> NamedTuple:
"""
:param name: name of the type being constructed
Expand Down Expand Up @@ -208,20 +172,37 @@ def _maybe_node_for_builtin(typ) -> Optional[col.SchemaNode]:

def _maybe_node_for_enum(typ) -> Optional[col.SchemaNode]:
if issubclass(typ, std_enum.Enum):
return col.SchemaNode(Enum(typ, allow_empty=True))
return col.SchemaNode(schema.Enum(typ, allow_empty=True))
return None


def _maybe_node_for_optional(typ) -> Optional[col.SchemaNode]:
# typ is Optional[T] where T is either unknown Any or a concrete type
if typ is Optional[Any]:
def _maybe_node_for_union(typ) -> Optional[col.SchemaNode]:
""" handles cases where typ is a Union, including the special
case of Optional[Any], which is in essence Union[None, T]
where T is either unknown Any or a concrete type.
"""
if insp.get_origin(typ) is not Union:
return None

NoneClass = None.__class__
variants = insp.get_args(typ)
if variants in ((NoneClass, Any), (Any, NoneClass)):
# Case for Optional[Any] and Union[None, Any] notations
return col.SchemaNode(col.Str(allow_empty=True), missing=None)
elif insp.get_origin(typ) is Union:
inner = insp.get_last_args(typ)[0]
inner_node = decide_node_type(inner)
inner_node.missing = None
return inner_node
return None

allow_empty = NoneClass in variants
node_variants = []
for variant in variants:
if variant is NoneClass:
continue
node = decide_node_type(variant)
if allow_empty:
node.missing = None
node_variants.append(node)
union_node = col.SchemaNode(schema.UnionNode(variants=node_variants))
if allow_empty:
union_node.missing = None
return union_node


def _maybe_node_for_list(typ) -> Optional[col.SequenceSchema]:
Expand Down Expand Up @@ -258,7 +239,7 @@ def decide_node_type(typ) -> col.SchemaNode:
# is unable to narrow down `typ` to NamedTuple
# at line _node_for_type(typ)
node = (_maybe_node_for_builtin(typ) or
_maybe_node_for_optional(typ) or
_maybe_node_for_union(typ) or
_maybe_node_for_list(typ) or
_maybe_node_for_enum(typ) or
_maybe_node_for_dict(typ) or
Expand All @@ -267,7 +248,7 @@ def decide_node_type(typ) -> col.SchemaNode:


def _node_for_type(typ: Type[Tuple]) -> col.SchemaNode:
constructor = col.SchemaNode(Structure(typ))
constructor = col.SchemaNode(schema.Structure(typ))
for field_name, field_type in typ.__annotations__.items():
source_name, __ = denormalize_name(field_name)
node_type = decide_node_type(field_type)
Expand Down Expand Up @@ -320,69 +301,9 @@ def codegen(typ: Type[Tuple],
return '\n'.join(code)



class Int(col.Int):

def serialize(self, node, appstruct):
""" Default colander integer serializer returns a string representation
of a number, whereas we want identical representation of the original data.
"""
r = super().serialize(node, appstruct)
if r is col.null:
return r
return int(r)


class Enum(col.Str):
def __init__(self, enum: Type[std_enum.Enum], *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.enum = enum

def serialize(self, node, appstruct):
""" Default colander integer serializer returns a string representation
of a number, whereas we want identical representation of the original data.
"""
if appstruct is col.null:
return appstruct
r = super().serialize(node, appstruct.value)
return r

def deserialize(self, node, cstruct) -> std_enum.Enum:
r = super().deserialize(node, cstruct)
if r is col.null:
return r
try:
return self.enum(r)
except ValueError:
raise col.Invalid(node, f'Invalid variant of {self.enum.__name__}', cstruct)


class Structure(col.Mapping):

def __init__(self,
typ: Type[Tuple],
unknown: str = 'ignore') -> None:
super().__init__(unknown)
self.typ = typ

def deserialize(self, node, cstruct):
r = super().deserialize(node, cstruct)
if r is col.null:
return r
return self.typ(**{normalize_name(k)[0]: v for k, v in r.items()})

def serialize(self, node, appstruct: NamedTuple):
if appstruct is col.null:
return super().serialize(node, appstruct)
return super().serialize(
node,
{denormalize_name(k)[0]: v for k, v in appstruct._asdict().items()}
)


BUILTIN_TO_SCHEMA_TYPES = {
str: col.Str(allow_empty=True),
int: Int(),
int: schema.Int(),
float: col.Float(),
bool: col.Bool(),
}
104 changes: 104 additions & 0 deletions typeit/schema.py
@@ -0,0 +1,104 @@
import enum as std_enum
from typing import Type, Tuple, NamedTuple, Sequence

import colander as col

from .utils import normalize_name, denormalize_name


class Enum(col.Str):
def __init__(self, enum: Type[std_enum.Enum], *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.enum = enum

def serialize(self, node, appstruct):
""" Default colander integer serializer returns a string representation
of a number, whereas we want identical representation of the original data.
"""
if appstruct is col.null:
return appstruct
r = super().serialize(node, appstruct.value)
return r

def deserialize(self, node, cstruct) -> std_enum.Enum:
r = super().deserialize(node, cstruct)
if r is col.null:
return r
try:
return self.enum(r)
except ValueError:
raise col.Invalid(node, f'Invalid variant of {self.enum.__name__}', cstruct)


class Structure(col.Mapping):

def __init__(self,
typ: Type[Tuple],
unknown: str = 'ignore') -> None:
super().__init__(unknown)
self.typ = typ

def deserialize(self, node, cstruct):
r = super().deserialize(node, cstruct)
if r is col.null:
return r
return self.typ(**{normalize_name(k)[0]: v for k, v in r.items()})

def serialize(self, node, appstruct: NamedTuple):
if appstruct is col.null:
return super().serialize(node, appstruct)
return super().serialize(
node,
{denormalize_name(k)[0]: v for k, v in appstruct._asdict().items()}
)


class UnionNode(col.Mapping):
def __init__(self,
variants: Sequence[col.SchemaNode]) -> None:
super().__init__(unknown='preserve')
self.variants = variants

def deserialize(self, node, cstruct):
if cstruct is None:
# explicitly passed None is not col.null
# therefore we must handle it separately
return cstruct

# get the initial dictionary from our mapping base class
r = super().deserialize(node, cstruct)
if cstruct is col.null:
return cstruct

# next, iterate over available variants and return the first
# matched structure.
rv = None
for variant in self.variants:
try:
rv = variant.deserialize(r)
break
except col.Invalid:
continue
else:
raise col.Invalid(node, 'None of the variants matches provided data', cstruct)
return rv

def serialize(self, node, appstruct: NamedTuple):
if appstruct is col.null:
return super().serialize(node, appstruct)
return super().serialize(
node,
{denormalize_name(k)[0]: v for k, v in appstruct._asdict().items()}
)


class Int(col.Int):

def serialize(self, node, appstruct):
""" Default colander integer serializer returns a string representation
of a number, whereas we want identical representation of the original data.
"""
r = super().serialize(node, appstruct)
if r is col.null:
return r
return int(r)
42 changes: 42 additions & 0 deletions typeit/utils.py
@@ -0,0 +1,42 @@
import re
from typing import Tuple


def normalize_name(name: str,
pattern=re.compile('^([_0-9]+).*$')) -> Tuple[str, bool]:
""" Some field name patterns are not allowed in NamedTuples
https://docs.python.org/3.7/library/collections.html#collections.namedtuple
"""
if name in RESERVED_WORDS or pattern.match(name):
return f'{NORMALIZATION_PREFIX}{name}', True
return name, False


def denormalize_name(name: str) -> Tuple[str, bool]:
""" Undo normalize_name()
"""
if name in NORMALIZED_RESERVED_WORDS or name.startswith(NORMALIZATION_PREFIX):
return name[len(NORMALIZATION_PREFIX):], True
return name, False


NORMALIZATION_PREFIX = 'normalized__'


RESERVED_WORDS = {
'and', 'del', 'from',
'not', 'while','as',
'elif', 'global', 'or',
'with','assert', 'else',
'if', 'pass', 'yield',
'break', 'except', 'import',
'print', 'class', 'exec',
'in', 'raise', 'continue',
'finally', 'is', 'return',
'def', 'for', 'lambda', 'try',
}


NORMALIZED_RESERVED_WORDS = {
f'{NORMALIZATION_PREFIX}{x}' for x in RESERVED_WORDS
}

0 comments on commit 1e096ca

Please sign in to comment.