-
Notifications
You must be signed in to change notification settings - Fork 2k
/
__init__.py
156 lines (116 loc) · 4.48 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# encoding: utf-8
import datetime
from sqlalchemy.orm import class_mapper
import sqlalchemy
from six import text_type
from ckan.common import config
from ckan.model.core import State
try:
RowProxy = sqlalchemy.engine.result.RowProxy
except AttributeError:
RowProxy = sqlalchemy.engine.base.RowProxy
try:
long # Python 2
except NameError:
long = int # Python 3
# NOTE
# The functions in this file contain very generic methods for dictizing objects
# and saving dictized objects. If a specialised use is needed please do NOT extend
# these functions. Copy code from here as needed.
def table_dictize(obj, context, **kw):
'''Get any model object and represent it as a dict'''
result_dict = {}
model = context["model"]
session = model.Session
if isinstance(obj, RowProxy):
fields = obj.keys()
else:
ModelClass = obj.__class__
table = class_mapper(ModelClass).mapped_table
fields = [field.name for field in table.c]
for field in fields:
name = field
if name in ('current', 'expired_timestamp', 'expired_id'):
continue
if name in ('continuity_id', 'revision_id'):
continue
value = getattr(obj, name)
if value is None:
result_dict[name] = value
elif isinstance(value, dict):
result_dict[name] = value
elif isinstance(value, int):
result_dict[name] = value
elif isinstance(value, long):
result_dict[name] = value
elif isinstance(value, datetime.datetime):
result_dict[name] = value.isoformat()
elif isinstance(value, list):
result_dict[name] = value
else:
result_dict[name] = text_type(value)
result_dict.update(kw)
##HACK For optimisation to get metadata_modified created faster.
context['metadata_modified'] = max(result_dict.get('revision_timestamp', ''),
context.get('metadata_modified', ''))
return result_dict
def obj_list_dictize(obj_list, context, sort_key=lambda x:x):
'''Get a list of model object and represent it as a list of dicts'''
result_list = []
active = context.get('active', True)
for obj in obj_list:
if context.get('with_capacity'):
obj, capacity = obj
dictized = table_dictize(obj, context, capacity=capacity)
else:
dictized = table_dictize(obj, context)
if active and obj.state != 'active':
continue
result_list.append(dictized)
return sorted(result_list, key=sort_key)
def obj_dict_dictize(obj_dict, context, sort_key=lambda x:x):
'''Get a dict whose values are model objects
and represent it as a list of dicts'''
result_list = []
for key, obj in obj_dict.items():
result_list.append(table_dictize(obj, context))
return sorted(result_list, key=sort_key)
def get_unique_constraints(table, context):
'''Get a list of unique constraints for a sqlalchemy table'''
list_of_constraints = []
for contraint in table.constraints:
if isinstance(contraint, sqlalchemy.UniqueConstraint):
columns = [column.name for column in contraint.columns]
list_of_constraints.append(columns)
return list_of_constraints
def table_dict_save(table_dict, ModelClass, context):
'''Given a dict and a model class, update or create a sqlalchemy object.
This will use an existing object if "id" is supplied OR if any unique
constraints are met. e.g supplying just a tag name will get out that tag obj.
'''
model = context["model"]
session = context["session"]
table = class_mapper(ModelClass).mapped_table
obj = None
id = table_dict.get("id")
if id:
obj = session.query(ModelClass).get(id)
if not obj:
unique_constraints = get_unique_constraints(table, context)
for constraint in unique_constraints:
params = dict((key, table_dict.get(key)) for key in constraint)
obj = session.query(ModelClass).filter_by(**params).first()
if obj:
if 'name' in params and getattr(obj, 'state', None) == State.DELETED:
obj.name = obj.id
obj = None
else:
break
if not obj:
obj = ModelClass()
for key, value in table_dict.iteritems():
if isinstance(value, list):
continue
setattr(obj, key, value)
session.add(obj)
return obj