Skip to content

Commit

Permalink
Improvements to Twitter login handler
Browse files Browse the repository at this point in the history
* Get better avatar size
* Handle OAuth exceptions correctly
* Clean up after ourselves
  • Loading branch information
fluffy-critter committed Aug 2, 2020
1 parent 6df6769 commit 4ede367
Showing 1 changed file with 72 additions and 46 deletions.
118 changes: 72 additions & 46 deletions authl/handlers/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import logging
import re
import time
import urllib.parse
Expand All @@ -28,6 +29,8 @@
from .. import disposition, utils
from . import Handler

LOGGER = logging.getLogger(__name__)


class Twitter(Handler):
"""
Expand Down Expand Up @@ -95,26 +98,29 @@ def initiate_auth(self, id_url, callback_uri, redir):
if not match:
return disposition.Error("Got invalid Twitter URL", redir)

username = match.group(2)
oauth_session = OAuth1Session(
client_key=self._client_key,
client_secret=self._client_secret,
callback_uri=callback_uri)
try:
username = match.group(2)
oauth_session = OAuth1Session(
client_key=self._client_key,
client_secret=self._client_secret,
callback_uri=callback_uri)

req = oauth_session.fetch_request_token('https://api.twitter.com/oauth/request_token')
req = oauth_session.fetch_request_token('https://api.twitter.com/oauth/request_token')

token = req.get('oauth_token')
params = {
'oauth_token': token,
'oauth_token_secret': req.get('oauth_token_secret')
}
if username:
params['screen_name'] = username
token = req.get('oauth_token')
params = {
'oauth_token': token,
'oauth_token_secret': req.get('oauth_token_secret')
}
if username:
params['screen_name'] = username

self._pending[token] = (params, callback_uri, redir, time.time())
self._pending[token] = (params, callback_uri, redir, time.time())

return disposition.Redirect(
'https://api.twitter.com/oauth/authorize?' + urllib.parse.urlencode(params))
return disposition.Redirect(
'https://api.twitter.com/oauth/authorize?' + urllib.parse.urlencode(params))
except Exception as err: # pylint:disable=broad-except
return disposition.Error(err, redir)

def check_callback(self, url, get, data):
if 'denied' in get:
Expand All @@ -135,36 +141,47 @@ def check_callback(self, url, get, data):
if 'denied' in get or 'oauth_verifier' not in get:
return disposition.Error("Twitter authorization declined", redir)

oauth_session = OAuth1Session(
client_key=self._client_key,
client_secret=self._client_secret,
resource_owner_key=params['oauth_token'],
resource_owner_secret=params['oauth_token_secret'],
callback_uri=callback_uri)

oauth_session.parse_authorization_response(url)

request = oauth_session.fetch_access_token('https://api.twitter.com/oauth/access_token')
auth = OAuth1(
client_key=self._client_key,
client_secret=self._client_secret,
resource_owner_key=request.get('oauth_token'),
resource_owner_secret=request.get('oauth_token_secret'))

verify_url = 'https://api.twitter.com/1.1/account/verify_credentials.json?skip_status=1'
user_info = requests.get(
verify_url, auth=auth).json()
if 'errors' in user_info:
return disposition.Error(
"Could not retrieve credentials: %r" % user_info.get('errors'),
redir)

return disposition.Verified(
# We include the user ID after the hash code to prevent folks from
# logging in by taking over a username that someone changed/abandoned.
f'https://twitter.com/{user_info["screen_name"]}#{user_info["id_str"]}',
redir,
self._build_profile(user_info))
try:
oauth_session = OAuth1Session(
client_key=self._client_key,
client_secret=self._client_secret,
resource_owner_key=params['oauth_token'],
resource_owner_secret=params['oauth_token_secret'],
callback_uri=callback_uri)

oauth_session.parse_authorization_response(url)

request = oauth_session.fetch_access_token('https://api.twitter.com/oauth/access_token')
token = request.get('oauth_token')
auth = OAuth1(
client_key=self._client_key,
client_secret=self._client_secret,
resource_owner_key=token,
resource_owner_secret=request.get('oauth_token_secret'))

user_info = requests.get(
'https://api.twitter.com/1.1/account/verify_credentials.json?skip_status=1',
auth=auth).json()
if 'errors' in user_info:
result = disposition.Error(
f"Could not retrieve credentials: {user_info.get('errors')}",
redir)
else:
result = disposition.Verified(
# We include the user ID after the hash code to prevent folks from
# logging in by taking over a username that someone changed/abandoned.
f'https://twitter.com/{user_info["screen_name"]}#{user_info["id_str"]}',
redir,
self._build_profile(user_info))

# let's clean up after ourselves
request = requests.post('https://api.twitter.com/1.1/oauth/invalidate_token.json',
auth=auth)
LOGGER.debug("Token revocation request: %d %s", request.status_code, request.text)

return result
except Exception as err: # pylint:disable=broad-except
return disposition.Error(err, redir)

@property
def generic_url(self):
Expand Down Expand Up @@ -194,6 +211,15 @@ def expand_entities(name):
profile = {p_key: expand_entities(t_key)
for p_key, t_key in mapping if t_key in user_info}

# attempt to get a more suitable image
if 'avatar' in profile:
for rendition in ('_400x400', ''):
req = requests.head(profile['avatar'].replace('_normal', rendition))
if 200 <= req.status_code < 300:
LOGGER.info("Found better avatar rendition: %s", req.url)
profile['avatar'] = req.url
break

return {k: v for k, v in profile.items() if v}


Expand Down

0 comments on commit 4ede367

Please sign in to comment.