Skip to content

Commit

Permalink
Merge pull request #85 from PlaidWeb/feature/twitter-testability
Browse files Browse the repository at this point in the history
Test coverage to 100%
  • Loading branch information
fluffy-critter committed Aug 12, 2020
2 parents 5cbcb59 + e8401b3 commit c2f0d75
Show file tree
Hide file tree
Showing 10 changed files with 810 additions and 646 deletions.
4 changes: 2 additions & 2 deletions authl/disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Verified(Disposition):
"""

def __init__(self, identity, redir, profile=None):
def __init__(self, identity: str, redir: str, profile: dict = None):
self.identity = identity
self.redir = redir
self.profile = profile or {}
Expand Down Expand Up @@ -88,7 +88,7 @@ class Error(Disposition):
"""

def __init__(self, message, redir: str):
self.message = message
self.message = str(message)
self.redir = redir

def __str__(self):
Expand Down
60 changes: 28 additions & 32 deletions authl/handlers/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def __init__(self, client_key: str,
# pylint:disable=too-many-arguments
self._client_key = client_key
self._client_secret = client_secret
self._timeout = timeout or 600
self._pending = expiringdict.ExpiringDict(
max_len=128,
max_age_seconds=timeout) if storage is None else storage
self._timeout = timeout or 600
max_age_seconds=self._timeout) if storage is None else storage

# regex to match a twitter URL and optionally extract the username
twitter_regex = re.compile(r'(https?://)?[^/]*\.?twitter\.com/?@?([^?]*)')
Expand All @@ -87,7 +87,7 @@ def handles_url(url):
if match:
return 'https://twitter.com/' + match.group(2)

return False
return None

@property
def cb_id(self):
Expand All @@ -108,14 +108,15 @@ def initiate_auth(self, id_url, callback_uri, redir):
req = oauth_session.fetch_request_token('https://api.twitter.com/oauth/request_token')

token = req.get('oauth_token')
secret = req.get('oauth_token_secret')

params = {
'oauth_token': token,
'oauth_token_secret': req.get('oauth_token_secret')
}
if username:
params['screen_name'] = username

self._pending[token] = (params, callback_uri, redir, time.time())
self._pending[token] = (secret, callback_uri, redir, time.time())

return disposition.Redirect(
'https://api.twitter.com/oauth/authorize?' + urllib.parse.urlencode(params))
Expand All @@ -130,23 +131,22 @@ def check_callback(self, url, get, data):
if not token or token not in self._pending:
return disposition.Error("Invalid transaction", '')

try:
params, callback_uri, redir, start_time = self._pending.pop(token)
except ValueError:
return disposition.Error("Invalid token", '')
secret, callback_uri, redir, start_time = self._pending.pop(token)

if time.time() > start_time + self._timeout:
return disposition.Error("Login timed out", redir)

if 'denied' in get or 'oauth_verifier' not in get:
return disposition.Error("Twitter authorization declined", redir)

auth = None

try:
oauth_session = OAuth1Session(
client_key=self._client_key,
client_secret=self._client_secret,
resource_owner_key=params['oauth_token'],
resource_owner_secret=params['oauth_token_secret'],
resource_owner_key=token,
resource_owner_secret=secret,
callback_uri=callback_uri)

oauth_session.parse_authorization_response(url)
Expand All @@ -162,34 +162,30 @@ def check_callback(self, url, get, data):
user_info = requests.get(
'https://api.twitter.com/1.1/account/verify_credentials.json?skip_status=1',
auth=auth).json()
if 'errors' in user_info:
result = disposition.Error(
f"Could not retrieve credentials: {user_info.get('errors')}",
redir)
else:
result = disposition.Verified(
# We include the user ID after the hash code to prevent folks from
# logging in by taking over a username that someone changed/abandoned.
f'https://twitter.com/{user_info["screen_name"]}#{user_info["id_str"]}',
redir,
self._build_profile(user_info))

# let's clean up after ourselves
request = requests.post('https://api.twitter.com/1.1/oauth/invalidate_token.json',
auth=auth)
LOGGER.debug("Token revocation request: %d %s", request.status_code, request.text)

return result
LOGGER.log(logging.WARNING if 'errors' in user_info else logging.NOTSET,
"User profile showed error: %s", user_info.get('errors'))
return disposition.Verified(
# We include the user ID after the hash code to prevent folks from
# logging in by taking over a username that someone changed/abandoned.
f'https://twitter.com/{user_info["screen_name"]}#{user_info["id_str"]}',
redir,
self.build_profile(user_info))
except Exception as err: # pylint:disable=broad-except
return disposition.Error(err, redir)
return disposition.Error(str(err), redir)
finally:
if auth:
# let's clean up after ourselves
request = requests.post('https://api.twitter.com/1.1/oauth/invalidate_token.json',
auth=auth)
LOGGER.debug("Token revocation request: %d %s", request.status_code, request.text)

@property
def generic_url(self):
return 'https://twitter.com/'

@staticmethod
def _build_profile(user_info: dict) -> dict:
# Get the basic profile
def build_profile(user_info: dict) -> dict:
""" Convert a Twitter userinfo JSON into an Authl profile """
entities = user_info.get('entities', {})

def expand_entities(name):
Expand Down
193 changes: 95 additions & 98 deletions tests/handlers/test_emailaddr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# pylint:disable=missing-docstring

import logging
import unittest.mock

from authl import disposition, tokens
from authl.handlers import email_addr
Expand All @@ -12,7 +11,7 @@
LOGGER = logging.getLogger(__name__)


def test_fixtures():
def test_basics():
handler = email_addr.EmailAddress(None, None, tokens.DictStore())
assert handler.service_name == 'Email'
assert handler.url_schemes
Expand Down Expand Up @@ -54,7 +53,7 @@ def do_callback(message):
assert result.cdata == 'some data'


def test_failures():
def test_failures(mocker):
store = {}
pending = {}

Expand Down Expand Up @@ -87,20 +86,20 @@ def check_pending(addr):
return handler.check_callback(url, parse_args(url), {})

# check for timeout failure
with unittest.mock.patch('time.time') as mock_time:
mock_time.return_value = 30
mock_time = mocker.patch('time.time')
mock_time.return_value = 30

assert len(store) == 0
initiate('timeout@example.com', '/timeout')
assert len(store) == 1
assert len(store) == 0
initiate('timeout@example.com', '/timeout')
assert len(store) == 1

mock_time.return_value = 20000
mock_time.return_value = 20000

result = check_pending('timeout@example.com')
assert isinstance(result, disposition.Error)
assert 'timed out' in result.message
assert result.redir == '/timeout'
assert len(store) == 0
result = check_pending('timeout@example.com')
assert isinstance(result, disposition.Error)
assert 'timed out' in result.message
assert result.redir == '/timeout'
assert len(store) == 0

# check for replay attacks
assert len(store) == 0
Expand All @@ -117,28 +116,28 @@ def check_pending(addr):
assert 'Invalid token' in str(result2)


def test_connector():
with unittest.mock.patch('smtplib.SMTP_SSL') as mock_smtp_ssl,\
unittest.mock.patch('ssl.SSLContext') as mock_ssl:
import ssl
def test_connector(mocker):
import ssl
mock_smtp_ssl = mocker.patch('smtplib.SMTP_SSL')
mock_ssl = mocker.patch('ssl.SSLContext')

conn = unittest.mock.MagicMock()
mock_smtp_ssl.return_value = conn
conn = mocker.MagicMock()
mock_smtp_ssl.return_value = conn

connector = email_addr.smtplib_connector('localhost', 25,
'test', 'poiufojar',
use_ssl=True)
connector()
connector = email_addr.smtplib_connector('localhost', 25,
'test', 'poiufojar',
use_ssl=True)
connector()

mock_smtp_ssl.assert_called_with('localhost', 25)
mock_ssl.assert_called_with(ssl.PROTOCOL_TLS_CLIENT)
conn.ehlo.assert_called()
conn.starttls.assert_called()
conn.login.assert_called_with('test', 'poiufojar')
mock_smtp_ssl.assert_called_with('localhost', 25)
mock_ssl.assert_called_with(ssl.PROTOCOL_TLS_CLIENT)
conn.ehlo.assert_called()
conn.starttls.assert_called()
conn.login.assert_called_with('test', 'poiufojar')


def test_simple_sendmail():
connector = unittest.mock.MagicMock(name='connector')
def test_simple_sendmail(mocker):
connector = mocker.MagicMock(name='connector')

import email
message = email.message.EmailMessage()
Expand All @@ -158,80 +157,78 @@ def test_simple_sendmail():
assert message['Subject'] == 'test subject'


def test_from_config():
def test_from_config(mocker):
store = {}
mock_open = unittest.mock.mock_open(read_data="test template content")

with unittest.mock.patch('smtplib.SMTP') as mock_smtp,\
unittest.mock.patch('builtins.open', mock_open):
conn = unittest.mock.MagicMock()
mock_smtp.return_value = conn
mock_open = mocker.patch('builtins.open', mocker.mock_open(read_data='template'))
mock_smtp = mocker.patch('smtplib.SMTP')
conn = mocker.MagicMock()
mock_smtp.return_value = conn

handler = email_addr.from_config({
'EMAIL_FROM': 'sender@example.com',
'EMAIL_SUBJECT': 'test subject',
'EMAIL_CHECK_MESSAGE': 'check yr email',
'EMAIL_TEMPLATE_FILE': 'template.txt',
'EMAIL_EXPIRE_TIME': 37,
'SMTP_HOST': 'smtp.example.com',
'SMTP_PORT': 587,
'SMTP_USE_SSL': False,
}, tokens.DictStore(store))

mock_open.assert_called_with('template.txt')
res = handler.initiate_auth('mailto:alice@bob.example', 'http://cb/', '/redir')
assert res.cdata['message'] == 'check yr email'

handler = email_addr.from_config({
'EMAIL_FROM': 'sender@example.com',
'EMAIL_SUBJECT': 'test subject',
'EMAIL_CHECK_MESSAGE': 'check yr email',
'EMAIL_TEMPLATE_FILE': 'template.txt',
'EMAIL_EXPIRE_TIME': 37,
'SMTP_HOST': 'smtp.example.com',
'SMTP_PORT': 587,
'SMTP_USE_SSL': False,
}, tokens.DictStore(store))

mock_open.assert_called_with('template.txt')
res = handler.initiate_auth('mailto:alice@bob.example', 'http://cb/', '/redir')
assert res.cdata['message'] == 'check yr email'

assert len(store) == 1
mock_smtp.assert_called_with('smtp.example.com', 587)
assert len(store) == 1
mock_smtp.assert_called_with('smtp.example.com', 587)


def test_please_wait():
def test_please_wait(mocker):
token_store = tokens.DictStore()
pending = {}
mock_send = unittest.mock.MagicMock()
mock_send = mocker.MagicMock()
handler = email_addr.EmailAddress(mock_send, "this is data", token_store,
expires_time=60,
pending_storage=pending)

with unittest.mock.patch('time.time') as mock_time:
assert mock_send.call_count == 0
mock_time.return_value = 10

# First auth should call mock_send
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 1
assert 'foo@bar.com' in pending
token_value = pending['foo@bar.com']

# Second auth should not
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 1
assert 'foo@bar.com' in pending
assert token_value == pending['foo@bar.com']

# Using the link should remove the pending item
handler.check_callback('http://example/', {'t': pending['foo@bar.com']}, {})
assert 'foo@bar.com' not in pending

# Next auth should call mock_send again
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 2
assert 'foo@bar.com' in pending
assert token_value != pending['foo@bar.com']
token_value = pending['foo@bar.com']

# Timing out the token should cause it to send again
mock_time.return_value = 1000
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 3
assert 'foo@bar.com' in pending
assert token_value != pending['foo@bar.com']
token_value = pending['foo@bar.com']

# And anything else that removes the token from the token_store should as well
token_store.remove(pending['foo@bar.com'])
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 4
assert token_value != pending['foo@bar.com']
token_value = pending['foo@bar.com']
mock_time = mocker.patch('time.time')
assert mock_send.call_count == 0
mock_time.return_value = 10

# First auth should call mock_send
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 1
assert 'foo@bar.com' in pending
token_value = pending['foo@bar.com']

# Second auth should not
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 1
assert 'foo@bar.com' in pending
assert token_value == pending['foo@bar.com']

# Using the link should remove the pending item
handler.check_callback('http://example/', {'t': pending['foo@bar.com']}, {})
assert 'foo@bar.com' not in pending

# Next auth should call mock_send again
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 2
assert 'foo@bar.com' in pending
assert token_value != pending['foo@bar.com']
token_value = pending['foo@bar.com']

# Timing out the token should cause it to send again
mock_time.return_value = 1000
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 3
assert 'foo@bar.com' in pending
assert token_value != pending['foo@bar.com']
token_value = pending['foo@bar.com']

# And anything else that removes the token from the token_store should as well
token_store.remove(pending['foo@bar.com'])
handler.initiate_auth('mailto:foo@bar.com', 'http://example/', 'blop')
assert mock_send.call_count == 4
assert token_value != pending['foo@bar.com']
token_value = pending['foo@bar.com']

0 comments on commit c2f0d75

Please sign in to comment.