-
Notifications
You must be signed in to change notification settings - Fork 24
/
test_auth.py
227 lines (187 loc) · 7.6 KB
/
test_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
libG(oogle)Reader
Copyright (C) 2010 Matt Behrens <askedrelic@gmail.com> http://asktherelic.com
Python library for working with the unofficial Google Reader API.
Unit tests for oauth and ClientAuthMethod in libgreader.
"""
try:
import unittest2 as unittest
except:
import unittest
from libgreader import GoogleReader, OAuthMethod, OAuth2Method, ClientAuthMethod, Feed
import requests
import re
from .config import *
class TestClientAuthMethod(unittest.TestCase):
def test_ClientAuthMethod_login(self):
ca = ClientAuthMethod(username,password)
self.assertNotEqual(ca, None)
def test_reader(self):
ca = ClientAuthMethod(username,password)
reader = GoogleReader(ca)
self.assertNotEqual(reader, None)
def test_bad_user_details(self):
self.assertRaises(IOError, ClientAuthMethod, 'asdsa', '')
def test_reader_user_info(self):
ca = ClientAuthMethod(username,password)
reader = GoogleReader(ca)
info = reader.getUserInfo()
self.assertEqual(dict, type(info))
self.assertEqual(firstname, info['userName'])
#automated approval of oauth url
#returns mechanize Response of the last "You have accepted" page
def automated_oauth_approval(url):
#general process is:
# 1. assume user isn't logged in, so get redirected to google accounts
# login page. login using test account credentials
# 2. redirected back to oauth approval page. br.submit() should choose the
# first submit on that page, which is the "Accept" button
br = mechanize.Browser()
br.open(url)
br.select_form(nr=0)
br["Email"] = username
br["Passwd"] = password
response1 = br.submit()
br.select_form(nr=0)
response2 = br.submit()
return response2
@unittest.skip('being deprecated')
class TestOAuth(unittest.TestCase):
def test_oauth_login(self):
auth = OAuthMethod(oauth_key, oauth_secret)
self.assertNotEqual(auth, None)
def test_getting_request_token(self):
auth = OAuthMethod(oauth_key, oauth_secret)
token, token_secret = auth.setAndGetRequestToken()
url = auth.buildAuthUrl()
response = automated_oauth_approval(url)
self.assertNotEqual(-1,response.get_data().find('You have successfully granted'))
def test_full_auth_process_without_callback(self):
auth = OAuthMethod(oauth_key, oauth_secret)
auth.setRequestToken()
auth_url = auth.buildAuthUrl()
response = automated_oauth_approval(auth_url)
auth.setAccessToken()
reader = GoogleReader(auth)
info = reader.getUserInfo()
self.assertEqual(dict, type(info))
self.assertEqual(firstname, info['userName'])
def test_full_auth_process_with_callback(self):
auth = OAuthMethod(oauth_key, oauth_secret)
#must be a working callback url for testing
auth.setCallback("http://www.asktherelic.com")
token, token_secret = auth.setAndGetRequestToken()
auth_url = auth.buildAuthUrl()
#callback section
#get response, which is a redirect to the callback url
response = automated_oauth_approval(auth_url)
query_string = urlparse.urlparse(response.geturl()).query
#grab the verifier token from the callback url query string
token_verifier = urlparse.parse_qs(query_string)['oauth_verifier'][0]
auth.setAccessTokenFromCallback(token, token_secret, token_verifier)
reader = GoogleReader(auth)
info = reader.getUserInfo()
self.assertEqual(dict, type(info))
self.assertEqual(firstname, info['userName'])
#automate getting the approval token
def mechanize_oauth2_approval(url):
"""
general process is:
1. assume user isn't logged in, so get redirected to google accounts
login page. login using account credentials
But, if the user has already granted access, the user is auto redirected without
having to confirm again.
2. redirected back to oauth approval page. br.submit() should choose the
first submit on that page, which is the "Accept" button
3. mechanize follows the redirect, and should throw 40X exception and
we return the token
"""
br = mechanize.Browser()
br.open(url)
br.select_form(nr=0)
br["Email"] = username
br["Passwd"] = password
try:
response1 = br.submit()
br.select_form(nr=0)
response2 = br.submit()
except Exception as e:
#watch for 40X exception on trying to load redirect page
pass
callback_url = br.geturl()
# split off the token in hackish fashion
return callback_url.split('code=')[1]
def automated_oauth2_approval(url):
"""
general process is:
1. assume user isn't logged in, so get redirected to google accounts
login page. login using account credentials
2. get redirected to oauth approval screen
3. authorize oauth app
"""
auth_url = url
headers = {'Referer': auth_url}
s = requests.Session()
r1 = s.get(auth_url)
post_data = dict((x[0],x[1]) for x in re.findall('name="(.*?)".*?value="(.*?)"', str(r1.content), re.MULTILINE))
post_data['Email'] = username
post_data['Passwd'] = password
post_data['timeStmp'] = ''
post_data['secTok'] = ''
post_data['signIn'] = 'Sign in'
post_data['GALX'] = s.cookies['GALX']
r2 = s.post('https://accounts.google.com/ServiceLoginAuth', data=post_data, headers=headers, allow_redirects=False)
#requests is fucking up the url encoding and double encoding ampersands
scope_url = r2.headers['location'].replace('amp%3B','')
# now get auth screen
r3 = s.get(scope_url)
# unless we have already authed!
if 'asktherelic' in r3.url:
code = r3.url.split('=')[1]
return code
post_data = dict((x[0],x[1]) for x in re.findall('name="(.*?)".*?value="(.*?)"', str(r3.content)))
post_data['submit_access'] = 'true'
post_data['_utf8'] = '☃'
# again, fucked encoding for amp;
action_url = re.findall('action="(.*?)"', str(r3.content))[0].replace('amp;','')
r4 = s.post(action_url, data=post_data, headers=headers, allow_redirects=False)
code = r4.headers['Location'].split('=')[1]
s.close()
return code
@unittest.skipIf("client_id" not in globals(), 'OAuth2 config not setup')
class TestOAuth2(unittest.TestCase):
def test_full_auth_and_access_userdata(self):
auth = OAuth2Method(client_id, client_secret)
auth.setRedirectUri(redirect_url)
url = auth.buildAuthUrl()
token = automated_oauth2_approval(url)
auth.code = token
auth.setAccessToken()
reader = GoogleReader(auth)
info = reader.getUserInfo()
self.assertEqual(dict, type(info))
self.assertEqual(firstname, info['userName'])
def test_oauth_subscribe(self):
auth = OAuth2Method(client_id, client_secret)
auth.setRedirectUri(redirect_url)
url = auth.buildAuthUrl()
token = automated_oauth2_approval(url)
auth.code = token
auth.setAccessToken()
auth.setActionToken()
reader = GoogleReader(auth)
slashdot = 'feed/http://rss.slashdot.org/Slashdot/slashdot'
#unsubscribe always return true; revert feedlist state
self.assertTrue(reader.unsubscribe(slashdot))
# now subscribe
self.assertTrue(reader.subscribe(slashdot))
# wait for server to update
import time
time.sleep(1)
reader.buildSubscriptionList()
# test subscribe successful
self.assertIn(slashdot, [x.id for x in reader.getSubscriptionList()])
if __name__ == '__main__':
unittest.main()