/
oauth.py
174 lines (125 loc) · 5.53 KB
/
oauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging
import simplejson
import urllib
import time
import string
import random
import base64
import os
import hashlib
import urllib2
import hmac
from oauthwhat.model import OAuthToken, Session
log = logging.getLogger(__name__)
class OAuthConsumer(object):
request_token_url = None
access_token_url = None
user_auth_url = None
#Set these manually (hella dirty hack)
consumer_key = None
consumer_secret = None
class UnknownSignatureException(Exception): pass
class UnauthorizedToken(Exception): pass
class AlreadyAuthorized(Exception): pass
#we have to do this so we don't get prompted with basic auth
opener = urllib2.build_opener(urllib2.HTTPBasicAuthHandler())
def __init__ (self, token = None):
if token is None or isinstance(token, OAuthToken):
self.token = token
elif isinstance(token, str) or isinstance(token, unicode):
self.token = Session.query(OAuthToken).filter(OAuthToken.oauth_token == token).first()
def get_new_request_token(self):
if self.token is not None:
raise Exception("We already have a token, we shouldn't get here")
token, token_secret = self._request_token()
self.token = OAuthToken(token, token_secret)
self.token.authorized = False
self.token.specifier = None
Session.add(self.token)
Session.commit()
return token
def get_auth_url(self, **args):
dat = self._gen_signed_data(self.user_auth_url, **args)
return self.user_auth_url + "?" + urllib.urlencode(dat)
def _request_token(self):
token_info = self.get_post_lines(self.request_token_url)
dat = dict(token.split('=') for token in token_info.split('&'))
return dat['oauth_token'], dat['oauth_token_secret']
def exchange_access_token(self):
if self.token.authorized:
raise self.AlreadyAuthorized("This user has already been authorized")
try:
token_info = self.get_post_lines(self.access_token_url)
except IOError, e:
raise self.UnauthorizedToken("Has not been authorized yet")
log.debug(token_info)
dat = dict(token.split('=') for token in token_info.split('&'))
token, token_secret = dat['oauth_token'], dat['oauth_token_secret']
Session.delete(self.token)
self.token = Session.query(OAuthToken).filter(OAuthToken.oauth_token == token).first()
log.debug("Toooken")
if self.token is None:
self.token = OAuthToken(token, token_secret, authorized = True)
Session.add(self.token)
log.debug("getting specifier...")
self.token.specifier = self.get_specifier()
log.debug("Specifier = %s" % self.token.specifier)
Session.commit()
raise Exception()
def _gen_signed_data(self, base_url, method="GET", sign_method='HMAC-SHA1', **params):
args = {'oauth_consumer_key': self.consumer_key,
'oauth_timestamp': self.__timestamp(),
'oauth_nonce': self.__nonce(),
'oauth_version': '1.0'}
args.update(params)
if sign_method == 'HMAC-SHA1':
args['oauth_signature_method'] = 'HMAC-SHA1'
key = self.consumer_secret + "&"
if self.token is not None:
args['oauth_token'] = self.token.oauth_token
key += urllib.quote(self.token.oauth_token_secret, '')
#would use urlencode, but it doesn't sort arguments
#pargs = [sorted('%s=%s' % (k,v) for k,v in args.values())]
message = '&'.join(
urllib.quote(i, '') for i in [method.upper(), base_url,
urllib.urlencode(sorted(args.iteritems()))])
args['oauth_signature'] = hmac.new(key, message, hashlib.sha1
).digest().encode('base64')[:-1]
# Add other sign_methods here
else:
raise self.UnknownSignatureException("Unknown signature method %s" % sign_method)
return args
def get_get(self, url, **args):
data = self._gen_signed_data(url, method="GET", **args)
log.debug(url, data)
uurl = "%s?%s" % (url, urllib.urlencode(data))
log.debug(uurl)
return self.opener.open(uurl)
def get_post(self, url, **args):
data = self._gen_signed_data(url, method="POST", **args)
log.debug(url, data)
return self.opener.open(url, urllib.urlencode(data))
def get_post_lines(self, url, **args):
f = self.get_post(url, **args)
return ''.join(f)
def get_specifier(self):
raise self.NotImplemented("One must implement authentication, main class is only for authorization")
@staticmethod
def __randstr(leng):
return base64.urlsafe_b64encode(os.urandom(leng))[:leng]
@staticmethod
def __timestamp():
return int(time.time())
@staticmethod
def __nonce(leng=16):
return OAuthConsumer.__randstr(leng)
class OAuthTwitterConsumer(OAuthConsumer):
request_token_url = 'https://twitter.com/oauth/request_token'
access_token_url = 'https://twitter.com/oauth/access_token'
user_auth_url = 'http://twitter.com/oauth/authorize'
twitter_verify_url = 'http://twitter.com/account/verify_credentials.json'
default_api_prefix = 'http://twitter.com'
default_api_suffix = '.json'
def get_specifier(self):
data = simplejson.load(self.get_get(self.twitter_verify_url))
return data['screen_name']