Permalink
Cannot retrieve contributors at this time
Fetching contributors…
| """ | |
| The MIT License | |
| Copyright (c) 2007 Leah Culver | |
| Permission is hereby granted, free of charge, to any person obtaining a copy | |
| of this software and associated documentation files (the "Software"), to deal | |
| in the Software without restriction, including without limitation the rights | |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| copies of the Software, and to permit persons to whom the Software is | |
| furnished to do so, subject to the following conditions: | |
| The above copyright notice and this permission notice shall be included in | |
| all copies or substantial portions of the Software. | |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
| THE SOFTWARE. | |
| """ | |
| import cgi | |
| import urllib | |
| import time | |
| import random | |
| import urlparse | |
| import hmac | |
| import binascii | |
| VERSION = '1.0' # Hi Blaine! | |
| HTTP_METHOD = 'GET' | |
| SIGNATURE_METHOD = 'PLAINTEXT' | |
| class OAuthError(RuntimeError): | |
| """Generic exception class.""" | |
| def __init__(self, message='OAuth error occured.'): | |
| self.message = message | |
| def build_authenticate_header(realm=''): | |
| """Optional WWW-Authenticate header (401 error)""" | |
| return {'WWW-Authenticate': 'OAuth realm="%s"' % realm} | |
| def escape(s): | |
| """Escape a URL including any /.""" | |
| return urllib.quote(s, safe='~') | |
| def _utf8_str(s): | |
| """Convert unicode to utf-8.""" | |
| if isinstance(s, unicode): | |
| return s.encode("utf-8") | |
| else: | |
| return str(s) | |
| def generate_timestamp(): | |
| """Get seconds since epoch (UTC).""" | |
| return int(time.time()) | |
| def generate_nonce(length=8): | |
| """Generate pseudorandom number.""" | |
| return ''.join([str(random.randint(0, 9)) for i in range(length)]) | |
| def generate_verifier(length=8): | |
| """Generate pseudorandom number.""" | |
| return ''.join([str(random.randint(0, 9)) for i in range(length)]) | |
| class OAuthConsumer(object): | |
| """Consumer of OAuth authentication. | |
| OAuthConsumer is a data type that represents the identity of the Consumer | |
| via its shared secret with the Service Provider. | |
| """ | |
| key = None | |
| secret = None | |
| def __init__(self, key, secret): | |
| self.key = key | |
| self.secret = secret | |
| class OAuthToken(object): | |
| """OAuthToken is a data type that represents an End User via either an access | |
| or request token. | |
| key -- the token | |
| secret -- the token secret | |
| """ | |
| key = None | |
| secret = None | |
| callback = None | |
| callback_confirmed = None | |
| verifier = None | |
| def __init__(self, key, secret): | |
| self.key = key | |
| self.secret = secret | |
| def set_callback(self, callback): | |
| self.callback = callback | |
| self.callback_confirmed = 'true' | |
| def set_verifier(self, verifier=None): | |
| if verifier is not None: | |
| self.verifier = verifier | |
| else: | |
| self.verifier = generate_verifier() | |
| def get_callback_url(self): | |
| if self.callback and self.verifier: | |
| # Append the oauth_verifier. | |
| parts = urlparse.urlparse(self.callback) | |
| scheme, netloc, path, params, query, fragment = parts[:6] | |
| if query: | |
| query = '%s&oauth_verifier=%s' % (query, self.verifier) | |
| else: | |
| query = 'oauth_verifier=%s' % self.verifier | |
| return urlparse.urlunparse((scheme, netloc, path, params, | |
| query, fragment)) | |
| return self.callback | |
| def to_string(self): | |
| data = { | |
| 'oauth_token': self.key, | |
| 'oauth_token_secret': self.secret, | |
| } | |
| if self.callback_confirmed is not None: | |
| data['oauth_callback_confirmed'] = self.callback_confirmed | |
| return urllib.urlencode(data) | |
| def from_string(s): | |
| """ Returns a token from something like: | |
| oauth_token_secret=xxx&oauth_token=xxx | |
| """ | |
| params = cgi.parse_qs(s, keep_blank_values=False) | |
| key = params['oauth_token'][0] | |
| secret = params['oauth_token_secret'][0] | |
| token = OAuthToken(key, secret) | |
| try: | |
| token.callback_confirmed = params['oauth_callback_confirmed'][0] | |
| except KeyError: | |
| pass # 1.0, no callback confirmed. | |
| return token | |
| from_string = staticmethod(from_string) | |
| def __str__(self): | |
| return self.to_string() | |
| class OAuthRequest(object): | |
| """OAuthRequest represents the request and can be serialized. | |
| OAuth parameters: | |
| - oauth_consumer_key | |
| - oauth_token | |
| - oauth_signature_method | |
| - oauth_signature | |
| - oauth_timestamp | |
| - oauth_nonce | |
| - oauth_version | |
| - oauth_verifier | |
| ... any additional parameters, as defined by the Service Provider. | |
| """ | |
| parameters = None # OAuth parameters. | |
| http_method = HTTP_METHOD | |
| http_url = None | |
| version = VERSION | |
| def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None): | |
| self.http_method = http_method | |
| self.http_url = http_url | |
| self.parameters = parameters or {} | |
| def set_parameter(self, parameter, value): | |
| self.parameters[parameter] = value | |
| def get_parameter(self, parameter): | |
| try: | |
| return self.parameters[parameter] | |
| except: | |
| raise OAuthError('Parameter not found: %s' % parameter) | |
| def _get_timestamp_nonce(self): | |
| return self.get_parameter('oauth_timestamp'), self.get_parameter( | |
| 'oauth_nonce') | |
| def get_nonoauth_parameters(self): | |
| """Get any non-OAuth parameters.""" | |
| parameters = {} | |
| for k, v in self.parameters.iteritems(): | |
| # Ignore oauth parameters. | |
| if k.find('oauth_') < 0: | |
| parameters[k] = v | |
| return parameters | |
| def to_header(self, realm=''): | |
| """Serialize as a header for an HTTPAuth request.""" | |
| auth_header = 'OAuth realm="%s"' % realm | |
| # Add the oauth parameters. | |
| if self.parameters: | |
| for k, v in self.parameters.iteritems(): | |
| if k[:6] == 'oauth_': | |
| auth_header += ', %s="%s"' % (k, escape(str(v))) | |
| return {'Authorization': auth_header} | |
| def to_postdata(self): | |
| """Serialize as post data for a POST request.""" | |
| return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) \ | |
| for k, v in self.parameters.iteritems()]) | |
| def to_url(self): | |
| """Serialize as a URL for a GET request.""" | |
| return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata()) | |
| def get_normalized_parameters(self): | |
| """Return a string that contains the parameters that must be signed.""" | |
| params = self.parameters | |
| try: | |
| # Exclude the signature if it exists. | |
| del params['oauth_signature'] | |
| except: | |
| pass | |
| # Escape key values before sorting. | |
| key_values = [(escape(_utf8_str(k)), escape(_utf8_str(v))) \ | |
| for k,v in params.items()] | |
| # Sort lexicographically, first after key, then after value. | |
| key_values.sort() | |
| # Combine key value pairs into a string. | |
| return '&'.join(['%s=%s' % (k, v) for k, v in key_values]) | |
| def get_normalized_http_method(self): | |
| """Uppercases the http method.""" | |
| return self.http_method.upper() | |
| def get_normalized_http_url(self): | |
| """Parses the URL and rebuilds it to be scheme://host/path.""" | |
| parts = urlparse.urlparse(self.http_url) | |
| scheme, netloc, path = parts[:3] | |
| # Exclude default port numbers. | |
| if scheme == 'http' and netloc[-3:] == ':80': | |
| netloc = netloc[:-3] | |
| elif scheme == 'https' and netloc[-4:] == ':443': | |
| netloc = netloc[:-4] | |
| return '%s://%s%s' % (scheme, netloc, path) | |
| def sign_request(self, signature_method, consumer, token): | |
| """Set the signature parameter to the result of build_signature.""" | |
| # Set the signature method. | |
| self.set_parameter('oauth_signature_method', | |
| signature_method.get_name()) | |
| # Set the signature. | |
| self.set_parameter('oauth_signature', | |
| self.build_signature(signature_method, consumer, token)) | |
| def build_signature(self, signature_method, consumer, token): | |
| """Calls the build signature method within the signature method.""" | |
| return signature_method.build_signature(self, consumer, token) | |
| def from_request(http_method, http_url, headers=None, parameters=None, | |
| query_string=None): | |
| """Combines multiple parameter sources.""" | |
| if parameters is None: | |
| parameters = {} | |
| # Headers | |
| if headers and 'Authorization' in headers: | |
| auth_header = headers['Authorization'] | |
| # Check that the authorization header is OAuth. | |
| if auth_header[:6] == 'OAuth ': | |
| auth_header = auth_header[6:] | |
| try: | |
| # Get the parameters from the header. | |
| header_params = OAuthRequest._split_header(auth_header) | |
| parameters.update(header_params) | |
| except: | |
| raise OAuthError('Unable to parse OAuth parameters from ' | |
| 'Authorization header.') | |
| # GET or POST query string. | |
| if query_string: | |
| query_params = OAuthRequest._split_url_string(query_string) | |
| parameters.update(query_params) | |
| # URL parameters. | |
| param_str = urlparse.urlparse(http_url)[4] # query | |
| url_params = OAuthRequest._split_url_string(param_str) | |
| parameters.update(url_params) | |
| if parameters: | |
| return OAuthRequest(http_method, http_url, parameters) | |
| return None | |
| from_request = staticmethod(from_request) | |
| def from_consumer_and_token(oauth_consumer, token=None, | |
| callback=None, verifier=None, http_method=HTTP_METHOD, | |
| http_url=None, parameters=None): | |
| if not parameters: | |
| parameters = {} | |
| defaults = { | |
| 'oauth_consumer_key': oauth_consumer.key, | |
| 'oauth_timestamp': generate_timestamp(), | |
| 'oauth_nonce': generate_nonce(), | |
| 'oauth_version': OAuthRequest.version, | |
| } | |
| defaults.update(parameters) | |
| parameters = defaults | |
| if token: | |
| parameters['oauth_token'] = token.key | |
| if token.callback: | |
| parameters['oauth_callback'] = token.callback | |
| # 1.0a support for verifier. | |
| if verifier: | |
| parameters['oauth_verifier'] = verifier | |
| elif callback: | |
| # 1.0a support for callback in the request token request. | |
| parameters['oauth_callback'] = callback | |
| return OAuthRequest(http_method, http_url, parameters) | |
| from_consumer_and_token = staticmethod(from_consumer_and_token) | |
| def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD, | |
| http_url=None, parameters=None): | |
| if not parameters: | |
| parameters = {} | |
| parameters['oauth_token'] = token.key | |
| if callback: | |
| parameters['oauth_callback'] = callback | |
| return OAuthRequest(http_method, http_url, parameters) | |
| from_token_and_callback = staticmethod(from_token_and_callback) | |
| def _split_header(header): | |
| """Turn Authorization: header into parameters.""" | |
| params = {} | |
| parts = header.split(',') | |
| for param in parts: | |
| # Ignore realm parameter. | |
| if param.find('realm') > -1: | |
| continue | |
| # Remove whitespace. | |
| param = param.strip() | |
| # Split key-value. | |
| param_parts = param.split('=', 1) | |
| # Remove quotes and unescape the value. | |
| params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"')) | |
| return params | |
| _split_header = staticmethod(_split_header) | |
| def _split_url_string(param_str): | |
| """Turn URL string into parameters.""" | |
| parameters = cgi.parse_qs(param_str, keep_blank_values=False) | |
| for k, v in parameters.iteritems(): | |
| parameters[k] = urllib.unquote(v[0]) | |
| return parameters | |
| _split_url_string = staticmethod(_split_url_string) | |
| class OAuthServer(object): | |
| """A worker to check the validity of a request against a data store.""" | |
| timestamp_threshold = 300 # In seconds, five minutes. | |
| version = VERSION | |
| signature_methods = None | |
| data_store = None | |
| def __init__(self, data_store=None, signature_methods=None): | |
| self.data_store = data_store | |
| self.signature_methods = signature_methods or {} | |
| def set_data_store(self, data_store): | |
| self.data_store = data_store | |
| def get_data_store(self): | |
| return self.data_store | |
| def add_signature_method(self, signature_method): | |
| self.signature_methods[signature_method.get_name()] = signature_method | |
| return self.signature_methods | |
| def fetch_request_token(self, oauth_request): | |
| """Processes a request_token request and returns the | |
| request token on success. | |
| """ | |
| try: | |
| # Get the request token for authorization. | |
| token = self._get_token(oauth_request, 'request') | |
| except OAuthError: | |
| # No token required for the initial token request. | |
| version = self._get_version(oauth_request) | |
| consumer = self._get_consumer(oauth_request) | |
| try: | |
| callback = self.get_callback(oauth_request) | |
| except OAuthError: | |
| callback = None # 1.0, no callback specified. | |
| self._check_signature(oauth_request, consumer, None) | |
| # Fetch a new token. | |
| token = self.data_store.fetch_request_token(consumer, callback) | |
| return token | |
| def fetch_access_token(self, oauth_request): | |
| """Processes an access_token request and returns the | |
| access token on success. | |
| """ | |
| version = self._get_version(oauth_request) | |
| consumer = self._get_consumer(oauth_request) | |
| try: | |
| verifier = self._get_verifier(oauth_request) | |
| except OAuthError: | |
| verifier = None | |
| # Get the request token. | |
| token = self._get_token(oauth_request, 'request') | |
| self._check_signature(oauth_request, consumer, token) | |
| new_token = self.data_store.fetch_access_token(consumer, token, verifier) | |
| return new_token | |
| def verify_request(self, oauth_request): | |
| """Verifies an api call and checks all the parameters.""" | |
| # -> consumer and token | |
| version = self._get_version(oauth_request) | |
| consumer = self._get_consumer(oauth_request) | |
| # Get the access token. | |
| token = self._get_token(oauth_request, 'access') | |
| self._check_signature(oauth_request, consumer, token) | |
| parameters = oauth_request.get_nonoauth_parameters() | |
| return consumer, token, parameters | |
| def authorize_token(self, token, user): | |
| """Authorize a request token.""" | |
| return self.data_store.authorize_request_token(token, user) | |
| def get_callback(self, oauth_request): | |
| """Get the callback URL.""" | |
| return oauth_request.get_parameter('oauth_callback') | |
| def build_authenticate_header(self, realm=''): | |
| """Optional support for the authenticate header.""" | |
| return {'WWW-Authenticate': 'OAuth realm="%s"' % realm} | |
| def _get_version(self, oauth_request): | |
| """Verify the correct version request for this server.""" | |
| try: | |
| version = oauth_request.get_parameter('oauth_version') | |
| except: | |
| version = VERSION | |
| if version and version != self.version: | |
| raise OAuthError('OAuth version %s not supported.' % str(version)) | |
| return version | |
| def _get_signature_method(self, oauth_request): | |
| """Figure out the signature with some defaults.""" | |
| try: | |
| signature_method = oauth_request.get_parameter( | |
| 'oauth_signature_method') | |
| except: | |
| signature_method = SIGNATURE_METHOD | |
| try: | |
| # Get the signature method object. | |
| signature_method = self.signature_methods[signature_method] | |
| except: | |
| signature_method_names = ', '.join(self.signature_methods.keys()) | |
| raise OAuthError('Signature method %s not supported try one of the ' | |
| 'following: %s' % (signature_method, signature_method_names)) | |
| return signature_method | |
| def _get_consumer(self, oauth_request): | |
| consumer_key = oauth_request.get_parameter('oauth_consumer_key') | |
| consumer = self.data_store.lookup_consumer(consumer_key) | |
| if not consumer: | |
| raise OAuthError('Invalid consumer.') | |
| return consumer | |
| def _get_token(self, oauth_request, token_type='access'): | |
| """Try to find the token for the provided request token key.""" | |
| token_field = oauth_request.get_parameter('oauth_token') | |
| token = self.data_store.lookup_token(token_type, token_field) | |
| if not token: | |
| raise OAuthError('Invalid %s token: %s' % (token_type, token_field)) | |
| return token | |
| def _get_verifier(self, oauth_request): | |
| return oauth_request.get_parameter('oauth_verifier') | |
| def _check_signature(self, oauth_request, consumer, token): | |
| timestamp, nonce = oauth_request._get_timestamp_nonce() | |
| self._check_timestamp(timestamp) | |
| self._check_nonce(consumer, token, nonce) | |
| signature_method = self._get_signature_method(oauth_request) | |
| try: | |
| signature = oauth_request.get_parameter('oauth_signature') | |
| except: | |
| raise OAuthError('Missing signature.') | |
| # Validate the signature. | |
| valid_sig = signature_method.check_signature(oauth_request, consumer, | |
| token, signature) | |
| if not valid_sig: | |
| key, base = signature_method.build_signature_base_string( | |
| oauth_request, consumer, token) | |
| raise OAuthError('Invalid signature. Expected signature base ' | |
| 'string: %s' % base) | |
| built = signature_method.build_signature(oauth_request, consumer, token) | |
| def _check_timestamp(self, timestamp): | |
| """Verify that timestamp is recentish.""" | |
| timestamp = int(timestamp) | |
| now = int(time.time()) | |
| lapsed = abs(now - timestamp) | |
| if lapsed > self.timestamp_threshold: | |
| raise OAuthError('Expired timestamp: given %d and now %s has a ' | |
| 'greater difference than threshold %d' % | |
| (timestamp, now, self.timestamp_threshold)) | |
| def _check_nonce(self, consumer, token, nonce): | |
| """Verify that the nonce is uniqueish.""" | |
| nonce = self.data_store.lookup_nonce(consumer, token, nonce) | |
| if nonce: | |
| raise OAuthError('Nonce already used: %s' % str(nonce)) | |
| class OAuthClient(object): | |
| """OAuthClient is a worker to attempt to execute a request.""" | |
| consumer = None | |
| token = None | |
| def __init__(self, oauth_consumer, oauth_token): | |
| self.consumer = oauth_consumer | |
| self.token = oauth_token | |
| def get_consumer(self): | |
| return self.consumer | |
| def get_token(self): | |
| return self.token | |
| def fetch_request_token(self, oauth_request): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| def fetch_access_token(self, oauth_request): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| def access_resource(self, oauth_request): | |
| """-> Some protected resource.""" | |
| raise NotImplementedError | |
| class OAuthDataStore(object): | |
| """A database abstraction used to lookup consumers and tokens.""" | |
| def lookup_consumer(self, key): | |
| """-> OAuthConsumer.""" | |
| raise NotImplementedError | |
| def lookup_token(self, oauth_consumer, token_type, token_token): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| def lookup_nonce(self, oauth_consumer, oauth_token, nonce): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| def fetch_request_token(self, oauth_consumer, oauth_callback): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| def authorize_request_token(self, oauth_token, user): | |
| """-> OAuthToken.""" | |
| raise NotImplementedError | |
| class OAuthSignatureMethod(object): | |
| """A strategy class that implements a signature method.""" | |
| def get_name(self): | |
| """-> str.""" | |
| raise NotImplementedError | |
| def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token): | |
| """-> str key, str raw.""" | |
| raise NotImplementedError | |
| def build_signature(self, oauth_request, oauth_consumer, oauth_token): | |
| """-> str.""" | |
| raise NotImplementedError | |
| def check_signature(self, oauth_request, consumer, token, signature): | |
| built = self.build_signature(oauth_request, consumer, token) | |
| return built == signature | |
| class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod): | |
| def get_name(self): | |
| return 'HMAC-SHA1' | |
| def build_signature_base_string(self, oauth_request, consumer, token): | |
| sig = ( | |
| escape(oauth_request.get_normalized_http_method()), | |
| escape(oauth_request.get_normalized_http_url()), | |
| escape(oauth_request.get_normalized_parameters()), | |
| ) | |
| key = '%s&' % escape(consumer.secret) | |
| if token: | |
| key += escape(token.secret) | |
| raw = '&'.join(sig) | |
| return key, raw | |
| def build_signature(self, oauth_request, consumer, token): | |
| """Builds the base signature string.""" | |
| key, raw = self.build_signature_base_string(oauth_request, consumer, | |
| token) | |
| # HMAC object. | |
| try: | |
| import hashlib # 2.5 | |
| hashed = hmac.new(key, raw, hashlib.sha1) | |
| except: | |
| import sha # Deprecated | |
| hashed = hmac.new(key, raw, sha) | |
| # Calculate the digest base 64. | |
| return binascii.b2a_base64(hashed.digest())[:-1] | |
| class OAuthSignatureMethod_PLAINTEXT(OAuthSignatureMethod): | |
| def get_name(self): | |
| return 'PLAINTEXT' | |
| def build_signature_base_string(self, oauth_request, consumer, token): | |
| """Concatenates the consumer key and secret.""" | |
| sig = '%s&' % escape(consumer.secret) | |
| if token: | |
| sig = sig + escape(token.secret) | |
| return sig, sig | |
| def build_signature(self, oauth_request, consumer, token): | |
| key, raw = self.build_signature_base_string(oauth_request, consumer, | |
| token) | |
| return key |