-
Notifications
You must be signed in to change notification settings - Fork 2k
/
test_dashboard.py
216 lines (183 loc) · 9.57 KB
/
test_dashboard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import ckan
from ckan.lib.helpers import json
import paste
import pylons.test
class TestDashboard(object):
'''Tests for the logic action functions related to the user's dashboard.'''
@classmethod
def user_create(cls):
'''Create a new user.'''
params = json.dumps({
'name': 'mr_new_user',
'email': 'mr@newuser.com',
'password': 'iammrnew',
})
response = cls.app.post('/api/action/user_create', params=params,
extra_environ={'Authorization': str(cls.joeadmin['apikey'])})
assert response.json['success'] is True
new_user = response.json['result']
return new_user
@classmethod
def setup_class(cls):
ckan.tests.CreateTestData.create()
cls.app = paste.fixture.TestApp(pylons.test.pylonsapp)
joeadmin = ckan.model.User.get('joeadmin')
cls.joeadmin = {
'id': joeadmin.id,
'apikey': joeadmin.apikey
}
annafan = ckan.model.User.get('annafan')
cls.annafan = {
'id': annafan.id,
'apikey': annafan.apikey
}
testsysadmin = ckan.model.User.get('testsysadmin')
cls.testsysadmin = {
'id': testsysadmin.id,
'apikey': testsysadmin.apikey
}
cls.new_user = cls.user_create()
@classmethod
def teardown_class(cls):
ckan.model.repo.rebuild_db()
def post(self, action, params=None, apikey=None, status=200):
'''Post to the CKAN API and return the result.'''
if params is None:
params = {}
params = json.dumps(params)
response = self.app.post('/api/action/{0}'.format(action),
params=params,
extra_environ={'Authorization': str(apikey)},
status=status)
if status in (200,):
assert response.json['success'] is True
return response.json['result']
else:
assert response.json['success'] is False
return response.json['error']
def dashboard_new_activities_count(self, user):
'''Return the given user's new activities count from the CKAN API.'''
return self.post('dashboard_new_activities_count',
apikey=user['apikey'])
def dashboard_activity_list(self, user):
'''Return the given user's dashboard activity list from the CKAN API.
'''
return self.post('dashboard_activity_list', apikey=user['apikey'])
def dashboard_new_activities(self, user):
'''Return the activities from the user's dashboard activity stream
that are currently marked as new.'''
activity_list = self.dashboard_activity_list(user)
return [activity for activity in activity_list if activity['is_new']]
def dashboard_mark_activities_old(self, user):
self.post('dashboard_mark_activities_old',
apikey=user['apikey'])
def test_00_dashboard_activity_list_not_logged_in(self):
self.post('dashboard_activity_list', status=403)
def test_00_dashboard_new_activities_count_not_logged_in(self):
self.post('dashboard_new_activities_count', status=403)
def test_00_dashboard_mark_new_activities_not_logged_in(self):
self.post('dashboard_mark_activities_old', status=403)
def test_01_new_activities_count_for_new_user(self):
'''Test that a newly registered user's new activities count is 0.'''
assert self.dashboard_new_activities_count(self.new_user) == 0
def test_01_new_activities_for_new_user(self):
'''Test that a newly registered user has no activities marked as new
in their dashboard activity stream.'''
assert len(self.dashboard_new_activities(self.new_user)) == 0
def test_02_own_activities_do_not_count_as_new(self):
'''Make a user do some activities and check that her own activities
don't increase her new activities count.'''
# The user has to view her dashboard activity stream first to mark any
# existing activities as read. For example when she follows a dataset
# below, past activities from the dataset (e.g. when someone created
# the dataset, etc.) will appear in her dashboard, and if she has never
# viewed her dashboard then those activities will be considered
# "unseen".
# We would have to do this if, when you follow something, you only get
# the activities from that object since you started following it, and
# not all its past activities as well.
self.dashboard_mark_activities_old(self.new_user)
# Create a new dataset.
params = json.dumps({
'name': 'my_new_package',
})
response = self.app.post('/api/action/package_create', params=params,
extra_environ={'Authorization': str(self.new_user['apikey'])})
assert response.json['success'] is True
# Follow a dataset.
params = json.dumps({'id': 'warandpeace'})
response = self.app.post('/api/action/follow_dataset', params=params,
extra_environ={'Authorization': str(self.new_user['apikey'])})
assert response.json['success'] is True
# Follow a user.
params = json.dumps({'id': 'annafan'})
response = self.app.post('/api/action/follow_user', params=params,
extra_environ={'Authorization': str(self.new_user['apikey'])})
assert response.json['success'] is True
# Follow a group.
params = json.dumps({'id': 'roger'})
response = self.app.post('/api/action/follow_group', params=params,
extra_environ={'Authorization': str(self.new_user['apikey'])})
assert response.json['success'] is True
# Update the dataset that we're following.
params = json.dumps({'name': 'warandpeace', 'notes': 'updated'})
response = self.app.post('/api/action/package_update', params=params,
extra_environ={'Authorization': str(self.new_user['apikey'])})
assert response.json['success'] is True
# User's own actions should not increase her activity count.
assert self.dashboard_new_activities_count(self.new_user) == 0
def test_03_own_activities_not_marked_as_new(self):
'''Make a user do some activities and check that her own activities
aren't marked as new in her dashboard activity stream.'''
assert len(self.dashboard_new_activities(self.new_user)) == 0
def test_04_new_activities_count(self):
'''Test that new activities from objects that a user follows increase
her new activities count.'''
# Make someone else who new_user is not following update a dataset that
# new_user is following.
params = json.dumps({'name': 'warandpeace', 'notes': 'updated again'})
response = self.app.post('/api/action/package_update', params=params,
extra_environ={'Authorization': str(self.joeadmin['apikey'])})
assert response.json['success'] is True
# Make someone that the user is following create a new dataset.
params = json.dumps({'name': 'annas_new_dataset'})
response = self.app.post('/api/action/package_create', params=params,
extra_environ={'Authorization': str(self.annafan['apikey'])})
assert response.json['success'] is True
# Make someone that the user is not following update a dataset that
# the user is not following, but that belongs to a group that the user
# is following.
params = json.dumps({'name': 'annakarenina', 'notes': 'updated'})
response = self.app.post('/api/action/package_update', params=params,
extra_environ={'Authorization': str(self.testsysadmin['apikey'])})
assert response.json['success'] is True
# FIXME: The number here should be 3 but activities from followed
# groups are not appearing in dashboard. When that is fixed, fix this
# number.
assert self.dashboard_new_activities_count(self.new_user) == 2
def test_05_activities_marked_as_new(self):
'''Test that new activities from objects that a user follows are
marked as new in her dashboard activity stream.'''
# FIXME: The number here should be 3 but activities from followed
# groups are not appearing in dashboard. When that is fixed, fix this
# number.
assert len(self.dashboard_new_activities(self.new_user)) == 2
def test_06_mark_new_activities_as_read(self):
'''Test that a user's new activities are marked as old when she views
her dashboard activity stream.'''
assert self.dashboard_new_activities_count(self.new_user) > 0
assert len(self.dashboard_new_activities(self.new_user)) > 0
self.dashboard_mark_activities_old(self.new_user)
assert self.dashboard_new_activities_count(self.new_user) == 0
assert len(self.dashboard_new_activities(self.new_user)) == 0
def test_07_maximum_number_of_new_activities(self):
'''Test that the new activities count does not go higher than 15, even
if there are more than 15 new activities from the user's followers.'''
for n in range(0, 20):
notes = "Updated {n} times".format(n=n)
params = json.dumps({'name': 'warandpeace', 'notes': notes})
response = self.app.post('/api/action/package_update',
params=params,
extra_environ={'Authorization': str(self.joeadmin['apikey'])})
assert response.json['success'] is True
assert self.dashboard_new_activities_count(self.new_user) == 15