-
Notifications
You must be signed in to change notification settings - Fork 19
/
helpers.py
164 lines (142 loc) · 4.85 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
import six
from sqlalchemy import (
Column, Text, Integer, Float, Boolean,
PrimaryKeyConstraint, ForeignKeyConstraint)
# Internal
def convert_table(prefix, table):
"""Convert high-level table name to database name.
"""
return prefix + table
def restore_table(prefix, table):
"""Restore database table name to high-level name.
"""
if table.startswith(prefix):
return table.replace(prefix, '', 1)
return None
def convert_schema(prefix, table, schema): # noqa
"""Convert JSONTableSchema schema to SQLAlchemy columns and constraints.
"""
# Init
columns = []
constraints = []
# Mapping
mapping = {
'string': Text(),
'integer': Integer(),
'number': Float(),
'boolean': Boolean(),
}
# Fields
for field in schema['fields']:
try:
column_type = mapping[field['type']]
except KeyError:
message = 'Type %s is not supported' % field['type']
raise TypeError(message)
nullable = not field.get('constraints', {}).get('required', False)
column = Column(field['name'], column_type, nullable=nullable)
columns.append(column)
# Primary key
pk = schema.get('primaryKey', None)
if pk is not None:
if isinstance(pk, six.string_types):
pk = [pk]
constraint = PrimaryKeyConstraint(*pk)
constraints.append(constraint)
# Foreign keys
fks = schema.get('foreignKeys', [])
for fk in fks:
fields = fk['fields']
if isinstance(fields, six.string_types):
fields = [fields]
resource = fk['reference']['resource']
if resource == 'self':
resource = table
elif resource == '<table>':
resource = convert_table(prefix, fk['reference']['table'])
else:
message = 'Supported only "self" and "<table>" references.'
raise ValueError(message)
references = fk['reference']['fields']
if isinstance(references, six.string_types):
references = [references]
joiner = lambda reference: '.'.join([resource, reference]) # noqa
references = list(map(joiner, references))
constraint = ForeignKeyConstraint(fields, references)
constraints.append(constraint)
return (columns, constraints)
def restore_schema(prefix, table, columns, constraints): # noqa
"""Convert SQLAlchemy columns and constraints to JSONTableSchema schema.
"""
# Init
schema = {}
# Mapping
mapping = {
Text: 'string',
Integer: 'integer',
Float: 'number',
Boolean: 'boolean',
}
# Fields
fields = []
for column in columns:
try:
field_type = [value for col_type, value in mapping.items()
if isinstance(column.type, col_type)][0]
except IndexError:
message = 'Type %s is not supported' % column.type
raise TypeError(message)
field = {'name': column.name, 'type': field_type}
if not column.nullable:
field['constraints'] = {'required': True}
fields.append(field)
schema['fields'] = fields
# Primary key
pk = []
for constraint in constraints:
if isinstance(constraint, PrimaryKeyConstraint):
for column in constraint.columns:
pk.append(column.name)
if len(pk) > 0:
if len(pk) == 1:
pk = pk.pop()
schema['primaryKey'] = pk
# Foreign keys
fks = []
for constraint in constraints:
if isinstance(constraint, ForeignKeyConstraint):
fields = []
reftable = None
resource = None
references = []
for element in constraint.elements:
fields.append(element.parent.name)
references.append(element.column.name)
if element.column.table.name == table:
resource = 'self'
else:
reftable = restore_table(
prefix, element.column.table.name)
if len(fields) == len(references) == 1:
fields = fields.pop()
references = references.pop()
fk = {
'fields': fields,
'reference': {
'fields': references,
}
}
if resource is not None:
fk['reference']['resource'] = resource
if reftable is not None:
fk['reference']['resource'] = '<table>'
fk['reference']['table'] = reftable
fks.append(fk)
if len(fks) > 0:
schema['foreignKeys'] = fks
return schema