-
Notifications
You must be signed in to change notification settings - Fork 2k
/
activity.py
324 lines (241 loc) · 10.5 KB
/
activity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# encoding: utf-8
import datetime
from sqlalchemy import (
orm, types, Column, Table, ForeignKey, desc, or_, union_all)
import ckan.model
import meta
import types as _types
import domain_object
__all__ = ['Activity', 'activity_table',
'ActivityDetail', 'activity_detail_table',
]
activity_table = Table(
'activity', meta.metadata,
Column('id', types.UnicodeText, primary_key=True, default=_types.make_uuid),
Column('timestamp', types.DateTime),
Column('user_id', types.UnicodeText),
Column('object_id', types.UnicodeText),
Column('revision_id', types.UnicodeText),
Column('activity_type', types.UnicodeText),
Column('data', _types.JsonDictType),
)
activity_detail_table = Table(
'activity_detail', meta.metadata,
Column('id', types.UnicodeText, primary_key=True, default=_types.make_uuid),
Column('activity_id', types.UnicodeText, ForeignKey('activity.id')),
Column('object_id', types.UnicodeText),
Column('object_type', types.UnicodeText),
Column('activity_type', types.UnicodeText),
Column('data', _types.JsonDictType),
)
class Activity(domain_object.DomainObject):
def __init__(self, user_id, object_id, revision_id, activity_type,
data=None):
self.id = _types.make_uuid()
self.timestamp = datetime.datetime.utcnow()
self.user_id = user_id
self.object_id = object_id
self.revision_id = revision_id
self.activity_type = activity_type
if data is None:
self.data = {}
else:
self.data = data
meta.mapper(Activity, activity_table)
class ActivityDetail(domain_object.DomainObject):
def __init__(self, activity_id, object_id, object_type, activity_type,
data=None):
self.activity_id = activity_id
self.object_id = object_id
self.object_type = object_type
self.activity_type = activity_type
if data is None:
self.data = {}
else:
self.data = data
@classmethod
def by_activity_id(cls, activity_id):
return ckan.model.Session.query(cls) \
.filter_by(activity_id = activity_id).all()
meta.mapper(ActivityDetail, activity_detail_table, properties = {
'activity':orm.relation ( Activity, backref=orm.backref('activity_detail'))
})
def _activities_limit(q, limit, offset=None):
'''
Return an SQLAlchemy query for all activities at an offset with a limit.
'''
import ckan.model as model
q = q.order_by(desc(model.Activity.timestamp))
if offset:
q = q.offset(offset)
if limit:
q = q.limit(limit)
return q
def _activities_union_all(*qlist):
'''
Return union of two or more queries sorted by timestamp,
and remove duplicates
'''
import ckan.model as model
return model.Session.query(model.Activity).select_entity_from(
union_all(*[q.subquery().select() for q in qlist])
).distinct(model.Activity.timestamp)
def _activities_at_offset(q, limit, offset):
'''
Return a list of all activities at an offset with a limit.
'''
return _activities_limit(q, limit, offset).all()
def _activities_from_user_query(user_id):
'''Return an SQLAlchemy query for all activities from user_id.'''
import ckan.model as model
q = model.Session.query(model.Activity)
q = q.filter(model.Activity.user_id == user_id)
return q
def _activities_about_user_query(user_id):
'''Return an SQLAlchemy query for all activities about user_id.'''
import ckan.model as model
q = model.Session.query(model.Activity)
q = q.filter(model.Activity.object_id == user_id)
return q
def _user_activity_query(user_id, limit):
'''Return an SQLAlchemy query for all activities from or about user_id.'''
q1 = _activities_limit(_activities_from_user_query(user_id), limit)
q2 = _activities_limit(_activities_about_user_query(user_id), limit)
return _activities_union_all(q1, q2)
def user_activity_list(user_id, limit, offset):
'''Return user_id's public activity stream.
Return a list of all activities from or about the given user, i.e. where
the given user is the subject or object of the activity, e.g.:
"{USER} created the dataset {DATASET}"
"{OTHER_USER} started following {USER}"
etc.
'''
q = _user_activity_query(user_id, limit + offset)
return _activities_at_offset(q, limit, offset)
def _package_activity_query(package_id):
'''Return an SQLAlchemy query for all activities about package_id.
'''
import ckan.model as model
q = model.Session.query(model.Activity)
q = q.filter_by(object_id=package_id)
return q
def package_activity_list(package_id, limit, offset):
'''Return the given dataset (package)'s public activity stream.
Returns all activities about the given dataset, i.e. where the given
dataset is the object of the activity, e.g.:
"{USER} created the dataset {DATASET}"
"{USER} updated the dataset {DATASET}"
etc.
'''
q = _package_activity_query(package_id)
return _activities_at_offset(q, limit, offset)
def _group_activity_query(group_id):
'''Return an SQLAlchemy query for all activities about group_id.
Returns a query for all activities whose object is either the group itself
or one of the group's datasets.
'''
import ckan.model as model
group = model.Group.get(group_id)
if not group:
# Return a query with no results.
return model.Session.query(model.Activity).filter("0=1")
dataset_ids = [dataset.id for dataset in group.packages()]
q = model.Session.query(model.Activity)
if dataset_ids:
q = q.filter(or_(model.Activity.object_id == group_id,
model.Activity.object_id.in_(dataset_ids)))
else:
q = q.filter(model.Activity.object_id == group_id)
return q
def group_activity_list(group_id, limit, offset):
'''Return the given group's public activity stream.
Returns all activities where the given group or one of its datasets is the
object of the activity, e.g.:
"{USER} updated the group {GROUP}"
"{USER} updated the dataset {DATASET}"
etc.
'''
q = _group_activity_query(group_id)
return _activities_at_offset(q, limit, offset)
def _activites_from_users_followed_by_user_query(user_id, limit):
'''Return a query for all activities from users that user_id follows.'''
import ckan.model as model
# Get a list of the users that the given user is following.
follower_objects = model.UserFollowingUser.followee_list(user_id)
if not follower_objects:
# Return a query with no results.
return model.Session.query(model.Activity).filter("0=1")
return _activities_union_all(*[
_user_activity_query(follower.object_id, limit)
for follower in follower_objects])
def _activities_from_datasets_followed_by_user_query(user_id, limit):
'''Return a query for all activities from datasets that user_id follows.'''
import ckan.model as model
# Get a list of the datasets that the user is following.
follower_objects = model.UserFollowingDataset.followee_list(user_id)
if not follower_objects:
# Return a query with no results.
return model.Session.query(model.Activity).filter("0=1")
return _activities_union_all(*[
_activities_limit(_package_activity_query(follower.object_id), limit)
for follower in follower_objects])
def _activities_from_groups_followed_by_user_query(user_id, limit):
'''Return a query for all activities about groups the given user follows.
Return a query for all activities about the groups the given user follows,
or about any of the group's datasets. This is the union of
_group_activity_query(group_id) for each of the groups the user follows.
'''
import ckan.model as model
# Get a list of the group's that the user is following.
follower_objects = model.UserFollowingGroup.followee_list(user_id)
if not follower_objects:
# Return a query with no results.
return model.Session.query(model.Activity).filter("0=1")
return _activities_union_all(*[
_activities_limit(_group_activity_query(follower.object_id), limit)
for follower in follower_objects])
def _activities_from_everything_followed_by_user_query(user_id, limit):
'''Return a query for all activities from everything user_id follows.'''
q1 = _activites_from_users_followed_by_user_query(user_id, limit)
q2 = _activities_from_datasets_followed_by_user_query(user_id, limit)
q3 = _activities_from_groups_followed_by_user_query(user_id, limit)
return _activities_union_all(q1, q2, q3)
def activities_from_everything_followed_by_user(user_id, limit, offset):
'''Return activities from everything that the given user is following.
Returns all activities where the object of the activity is anything
(user, dataset, group...) that the given user is following.
'''
q = _activities_from_everything_followed_by_user_query(
user_id,
limit + offset)
return _activities_at_offset(q, limit, offset)
def _dashboard_activity_query(user_id, limit):
'''Return an SQLAlchemy query for user_id's dashboard activity stream.'''
q1 = _user_activity_query(user_id, limit)
q2 = _activities_from_everything_followed_by_user_query(user_id, limit)
return _activities_union_all(q1, q2)
def dashboard_activity_list(user_id, limit, offset):
'''Return the given user's dashboard activity stream.
Returns activities from the user's public activity stream, plus
activities from everything that the user is following.
This is the union of user_activity_list(user_id) and
activities_from_everything_followed_by_user(user_id).
'''
q = _dashboard_activity_query(user_id, limit + offset)
return _activities_at_offset(q, limit, offset)
def _changed_packages_activity_query():
'''Return an SQLAlchemyu query for all changed package activities.
Return a query for all activities with activity_type '*package', e.g.
'new_package', 'changed_package', 'deleted_package'.
'''
import ckan.model as model
q = model.Session.query(model.Activity)
q = q.filter(model.Activity.activity_type.endswith('package'))
return q
def recently_changed_packages_activity_list(limit, offset):
'''Return the site-wide stream of recently changed package activities.
This activity stream includes recent 'new package', 'changed package' and
'deleted package' activities for the whole site.
'''
q = _changed_packages_activity_query()
return _activities_at_offset(q, limit, offset)