-
Notifications
You must be signed in to change notification settings - Fork 2k
/
revision_legacy_code.py
203 lines (173 loc) · 7.41 KB
/
revision_legacy_code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# encoding: utf-8
from sqlalchemy.sql import select
import ckan.logic as logic
import ckan.lib.dictization as d
from ckan.lib.dictization.model_dictize import (
_execute, resource_list_dictize, extras_list_dictize, group_list_dictize)
from ckan import model
# This is based on ckan.lib.dictization.model_dictize:package_dictize
# BUT you can ask for a old revision to the package by specifying 'revision_id'
# in the context
def package_dictize_with_revisions(pkg, context):
'''
Given a Package object, returns an equivalent dictionary.
Normally this is the most recent version, but you can provide revision_id
or revision_date in the context and it will filter to an earlier time.
May raise NotFound if:
* the specified revision_id doesn't exist
* the specified revision_date was before the package was created
'''
model = context['model']
is_latest_revision = not(context.get(u'revision_id') or
context.get(u'revision_date'))
execute = _execute if is_latest_revision else _execute_with_revision
# package
if is_latest_revision:
if isinstance(pkg, revision_model.PackageRevision):
pkg = model.Package.get(pkg.id)
result = pkg
else:
package_rev = revision_model.package_revision_table
q = select([package_rev]).where(package_rev.c.id == pkg.id)
result = execute(q, package_rev, context).first()
if not result:
raise logic.NotFound
result_dict = d.table_dictize(result, context)
# strip whitespace from title
if result_dict.get(u'title'):
result_dict['title'] = result_dict['title'].strip()
# resources
if is_latest_revision:
res = model.resource_table
else:
res = revision_model.resource_revision_table
q = select([res]).where(res.c.package_id == pkg.id)
result = execute(q, res, context)
result_dict["resources"] = resource_list_dictize(result, context)
result_dict['num_resources'] = len(result_dict.get(u'resources', []))
# tags
tag = model.tag_table
if is_latest_revision:
pkg_tag = model.package_tag_table
else:
pkg_tag = revision_model.package_tag_revision_table
q = select([tag, pkg_tag.c.state],
from_obj=pkg_tag.join(tag, tag.c.id == pkg_tag.c.tag_id)
).where(pkg_tag.c.package_id == pkg.id)
result = execute(q, pkg_tag, context)
result_dict["tags"] = d.obj_list_dictize(result, context,
lambda x: x["name"])
result_dict['num_tags'] = len(result_dict.get(u'tags', []))
# Add display_names to tags. At first a tag's display_name is just the
# same as its name, but the display_name might get changed later (e.g.
# translated into another language by the multilingual extension).
for tag in result_dict['tags']:
assert u'display_name' not in tag
tag['display_name'] = tag['name']
# extras
if is_latest_revision:
extra = model.package_extra_table
else:
extra = revision_model.extra_revision_table
q = select([extra]).where(extra.c.package_id == pkg.id)
result = execute(q, extra, context)
result_dict["extras"] = extras_list_dictize(result, context)
# groups
if is_latest_revision:
member = model.member_table
else:
member = revision_model.member_revision_table
group = model.group_table
q = select([group, member.c.capacity],
from_obj=member.join(group, group.c.id == member.c.group_id)
).where(member.c.table_id == pkg.id)\
.where(member.c.state == u'active') \
.where(group.c.is_organization == False) # noqa
result = execute(q, member, context)
context['with_capacity'] = False
# no package counts as cannot fetch from search index at the same
# time as indexing to it.
# tags, extras and sub-groups are not included for speed
result_dict["groups"] = group_list_dictize(result, context,
with_package_counts=False)
# owning organization
if is_latest_revision:
group = model.group_table
else:
group = revision_model.group_revision_table
q = select([group]
).where(group.c.id == result_dict['owner_org']) \
.where(group.c.state == u'active')
result = execute(q, group, context)
organizations = d.obj_list_dictize(result, context)
if organizations:
result_dict["organization"] = organizations[0]
else:
result_dict["organization"] = None
# relations
if is_latest_revision:
rel = model.package_relationship_table
else:
rel = revision_model \
.package_relationship_revision_table
q = select([rel]).where(rel.c.subject_package_id == pkg.id)
result = execute(q, rel, context)
result_dict["relationships_as_subject"] = \
d.obj_list_dictize(result, context)
q = select([rel]).where(rel.c.object_package_id == pkg.id)
result = execute(q, rel, context)
result_dict["relationships_as_object"] = \
d.obj_list_dictize(result, context)
# Extra properties from the domain object
# We need an actual Package object for this, not a PackageRevision
# if isinstance(pkg, model.PackageRevision):
# pkg = model.Package.get(pkg.id)
# isopen
result_dict['isopen'] = pkg.isopen if isinstance(pkg.isopen, bool) \
else pkg.isopen()
# type
# if null assign the default value to make searching easier
result_dict['type'] = pkg.type or u'dataset'
# license
if pkg.license and pkg.license.url:
result_dict['license_url'] = pkg.license.url
result_dict['license_title'] = pkg.license.title.split(u'::')[-1]
elif pkg.license:
result_dict['license_title'] = pkg.license.title
else:
result_dict['license_title'] = pkg.license_id
# creation and modification date
if is_latest_revision:
result_dict['metadata_modified'] = pkg.metadata_modified.isoformat()
# (If not is_latest_revision, don't use pkg which is the latest version.
# Instead, use the dates already in result_dict that came from the dictized
# PackageRevision)
result_dict['metadata_created'] = pkg.metadata_created.isoformat()
return result_dict
def _execute_with_revision(q, rev_table, context):
'''
Takes an SqlAlchemy query (q) that is (at its base) a Select on an object
revision table (rev_table), and you provide revision_id or revision_date in
the context and it will filter the object revision(s) to an earlier time.
Raises NotFound if context['revision_id'] is provided, but the revision
ID does not exist.
Returns [] if there are no results.
'''
model = context['model']
session = model.Session
revision_id = context.get(u'revision_id')
revision_date = context.get(u'revision_date')
if revision_id:
revision = session.query(revision_model.Revision) \
.filter_by(id=revision_id).first()
if not revision:
raise logic.NotFound
revision_date = revision.timestamp
q = q.where(rev_table.c.revision_timestamp <= revision_date)
q = q.where(rev_table.c.expired_timestamp > revision_date)
return session.execute(q)
# It's easiest if this code works on all versions of CKAN. After CKAN 2.8 the
# revision model is separate from the main model.
# (RevisionTableMappings will be defined above)
# CKAN<=2.8
revision_model = model