Permalink
Cannot retrieve contributors at this time
Fetching contributors…
| # Tweepy | |
| # Copyright 2009-2010 Joshua Roesslein | |
| # See LICENSE for details. | |
| from urllib2 import Request, urlopen | |
| import base64 | |
| from tweepy import oauth | |
| from tweepy.error import TweepError | |
| from tweepy.api import API | |
| class AuthHandler(object): | |
| def apply_auth(self, url, method, headers, parameters): | |
| """Apply authentication headers to request""" | |
| raise NotImplementedError | |
| def get_username(self): | |
| """Return the username of the authenticated user""" | |
| raise NotImplementedError | |
| class OAuthHandler(AuthHandler): | |
| """OAuth authentication handler""" | |
| OAUTH_HOST = 'api.twitter.com' | |
| OAUTH_ROOT = '/oauth/' | |
| def __init__(self, consumer_key, consumer_secret, callback=None, secure=True): | |
| if type(consumer_key) == unicode: | |
| consumer_key = bytes(consumer_key) | |
| if type(consumer_secret) == unicode: | |
| consumer_secret = bytes(consumer_secret) | |
| self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret) | |
| self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1() | |
| self.request_token = None | |
| self.access_token = None | |
| self.callback = callback | |
| self.username = None | |
| self.secure = secure | |
| def _get_oauth_url(self, endpoint, secure=True): | |
| if self.secure or secure: | |
| prefix = 'https://' | |
| else: | |
| prefix = 'http://' | |
| return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint | |
| def apply_auth(self, url, method, headers, parameters): | |
| request = oauth.OAuthRequest.from_consumer_and_token( | |
| self._consumer, http_url=url, http_method=method, | |
| token=self.access_token, parameters=parameters | |
| ) | |
| request.sign_request(self._sigmethod, self._consumer, self.access_token) | |
| headers.update(request.to_header()) | |
| def _get_request_token(self): | |
| try: | |
| url = self._get_oauth_url('request_token') | |
| request = oauth.OAuthRequest.from_consumer_and_token( | |
| self._consumer, http_url=url, callback=self.callback | |
| ) | |
| request.sign_request(self._sigmethod, self._consumer, None) | |
| resp = urlopen(Request(url, headers=request.to_header())) | |
| return oauth.OAuthToken.from_string(resp.read()) | |
| except Exception, e: | |
| raise TweepError(e) | |
| def set_request_token(self, key, secret): | |
| self.request_token = oauth.OAuthToken(key, secret) | |
| def set_access_token(self, key, secret): | |
| self.access_token = oauth.OAuthToken(key, secret) | |
| def get_authorization_url(self, signin_with_twitter=False): | |
| """Get the authorization URL to redirect the user""" | |
| try: | |
| # get the request token | |
| self.request_token = self._get_request_token() | |
| # build auth request and return as url | |
| if signin_with_twitter: | |
| url = self._get_oauth_url('authenticate') | |
| else: | |
| url = self._get_oauth_url('authorize') | |
| request = oauth.OAuthRequest.from_token_and_callback( | |
| token=self.request_token, http_url=url | |
| ) | |
| return request.to_url() | |
| except Exception, e: | |
| raise TweepError(e) | |
| def get_access_token(self, verifier=None): | |
| """ | |
| After user has authorized the request token, get access token | |
| with user supplied verifier. | |
| """ | |
| try: | |
| url = self._get_oauth_url('access_token') | |
| # build request | |
| request = oauth.OAuthRequest.from_consumer_and_token( | |
| self._consumer, | |
| token=self.request_token, http_url=url, | |
| verifier=str(verifier) | |
| ) | |
| request.sign_request(self._sigmethod, self._consumer, self.request_token) | |
| # send request | |
| resp = urlopen(Request(url, headers=request.to_header())) | |
| self.access_token = oauth.OAuthToken.from_string(resp.read()) | |
| return self.access_token | |
| except Exception, e: | |
| raise TweepError(e) | |
| def get_xauth_access_token(self, username, password): | |
| """ | |
| Get an access token from an username and password combination. | |
| In order to get this working you need to create an app at | |
| http://twitter.com/apps, after that send a mail to api@twitter.com | |
| and request activation of xAuth for it. | |
| """ | |
| try: | |
| url = self._get_oauth_url('access_token', secure=True) # must use HTTPS | |
| request = oauth.OAuthRequest.from_consumer_and_token( | |
| oauth_consumer=self._consumer, | |
| http_method='POST', http_url=url, | |
| parameters = { | |
| 'x_auth_mode': 'client_auth', | |
| 'x_auth_username': username, | |
| 'x_auth_password': password | |
| } | |
| ) | |
| request.sign_request(self._sigmethod, self._consumer, None) | |
| resp = urlopen(Request(url, data=request.to_postdata())) | |
| self.access_token = oauth.OAuthToken.from_string(resp.read()) | |
| return self.access_token | |
| except Exception, e: | |
| raise TweepError(e) | |
| def get_username(self): | |
| if self.username is None: | |
| api = API(self) | |
| user = api.verify_credentials() | |
| if user: | |
| self.username = user.screen_name | |
| else: | |
| raise TweepError("Unable to get username, invalid oauth token!") | |
| return self.username | |