-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
importer.py
140 lines (118 loc) · 3.93 KB
/
importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Allows importing .gql files as Python modules, tied into the rest of the
library.
"""
import ast
import graphql
from import_x import ExtensionLoader
from ._mod_impl import __query__
from .providers import query_for_schema
def build_func(provider, definition):
"""
Builds a python function from a GraphQL AST definition
"""
name = definition.name.value
source = graphql.print_ast(definition)
assert definition.operation != graphql.OperationType.SUBSCRIPTION
params = [build_param(var) for var in definition.variable_definitions]
# TODO: Line numbers
return ast.FunctionDef(
name=name,
args=ast.arguments(
args=[],
defaults=[],
kwonlyargs=[ast.arg(arg=name, annotation=None) for name, _ in params],
kw_defaults=[val for _, val in params],
vararg=None,
kwarg=None,
),
body=[
ast.Return(
value=ast.Call(
func=ast.Name(id='__query__', ctx=ast.Load()),
args=[
value2pyliteral(provider),
value2pyliteral(source),
],
keywords=[
ast.keyword(arg=name, value=ast.Name(id=name, ctx=ast.Load()))
for name, _ in params
],
),
),
],
decorator_list=[],
)
def build_param(var):
name = var.variable.name.value
# NOTE: Don't care about the type name until we can start building annotations
if var.type.kind == 'non_null_type':
# typ = var.type.type.name.value
nullable = False
elif var.type.kind == 'named_type':
# typ = var.type.name.value
nullable = True
has_default = nullable or (var.default_value is not None)
defaultvalue = gqlliteral2value(var.default_value)
return name, value2pyliteral(defaultvalue) if has_default else None
def value2pyliteral(val):
if isinstance(val, int):
return ast.Num(n=val)
elif isinstance(val, float):
return ast.Num(n=val)
elif isinstance(val, str):
return ast.Str(s=val)
elif val is None:
return ast.NameConstant(value=None)
else:
raise ValueError(f"Can't translate {val!r}")
def gqlliteral2value(node):
if node is None:
return None
return graphql.value_from_ast_untyped(node)
class GqlLoader(ExtensionLoader):
extension = '.gql'
auto_enable = True
@staticmethod
def handle_module(module, path):
with open(path, 'rt', encoding='utf-8') as fobj:
provider, code = read_code(fobj)
gast = graphql.parse(graphql.Source(code, path))
if provider is not None:
schema = query_for_schema(provider)
errors = graphql.validate(schema, gast)
if errors:
raise find_first_error(errors)
else:
# TODO: Error or warning?
pass
mod = ast.Module(body=[
build_func(provider, defin)
for defin in gast.definitions
if defin.kind == 'operation_definition'
])
ast.fix_missing_locations(mod)
module.__query__ = __query__
code = compile(mod, path, 'exec')
exec(code, vars(module))
def read_code(fobj):
provider = None
# Provider is "#~provider~", ignoring whitespace, and most be in an initial
# block of #'s
for line in fobj:
line = line.strip().replace(' ', '').replace('\t', '')
if line.startswith('#~') and line.endswith('~'):
provider = line[2:-1]
break
elif not line.startswith('#'):
break
fobj.seek(0)
return provider, fobj.read()
def find_first_error(errors):
locmap = {
loc: err
for err in errors
for loc in err.locations
}
locs = sorted(locmap.keys(), key=lambda l: (l.line, l.column))
return locmap[locs[0]]