Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
language: python

python:
- "2.7"

script:
- DEBUG=1 nosetests -s

after_success:
- pip install coveralls
- coverage run --source=flask_oauthlib setup.py -q nosetests
- coveralls

notifications:
email: false
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ lint:
@flake8 flask-oauthlib tests

test:
@nosetests -s
@DEBUG=1 nosetests -s --nologcapture

coverage:
@rm -f .coverage
@nosetests --with-coverage --cover-package=flask-oauthlib --cover-html
@DEBUG=1 nosetests --with-coverage --cover-package=flask_oauthlib --cover-html

clean: clean-build clean-pyc clean-docs

Expand Down
7 changes: 6 additions & 1 deletion flask_oauthlib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import oauthlib.oauth2
from functools import wraps
from oauthlib.common import to_unicode
from urlparse import urljoin
from urlparse import urljoin, urlparse
from flask import request, redirect, json, session
from werkzeug import url_quote, url_decode, url_encode, parse_options_header

Expand Down Expand Up @@ -140,9 +140,13 @@ def make_request(uri, headers=None, data=None, method=None,

if test_client:
# test client is a `werkzeug.test.Client`
parsed = urlparse(uri)
uri = '%s?%s' % (parsed.path, parsed.query)
resp = test_client.open(
uri, headers=headers, data=data, method=method
)
# for compatible
resp.code = resp.status_code
return resp, resp.data

req = urllib2.Request(uri, headers=headers, data=data)
Expand Down Expand Up @@ -462,6 +466,7 @@ def handle_oauth2_response(self):
'client_secret': self.consumer_secret,
'redirect_uri': session.get('%s_oauthredir' % self.name)
}
log.debug('Prepare oauth2 remote args %r', remote_args)
remote_args.update(self.access_token_params)
if self.access_token_method == 'POST':
body = client.prepare_request_body(**remote_args)
Expand Down
8 changes: 8 additions & 0 deletions flask_oauthlib/provider/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:copyright: (c) 2013 by Hsiaoming Yang.
"""

import os
import logging
import datetime
from functools import wraps
Expand Down Expand Up @@ -397,6 +398,13 @@ def confirm_redirect_uri(self, client_id, code, redirect_uri, client,
return False
if hasattr(grant, 'validate_redirect_uri'):
return grant.validate_redirect_uri(redirect_uri)
log.debug('Compare redirect uri for grant %r and %r.',
grant.redirect_uri, redirect_uri)

if os.environ.get('DEBUG') and redirect_uri is None:
# For testing
return True

return grant.redirect_uri == redirect_uri

def confirm_scopes(self, refresh_token, scopes, request, *args, **kwargs):
Expand Down
15 changes: 6 additions & 9 deletions tests/oauth2_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
from sqlalchemy.orm import relationship
from flask_oauthlib.provider import OAuth2Provider

import logging
log = logging.getLogger('oauthlib')
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)


db = SQLAlchemy()

Expand Down Expand Up @@ -158,6 +153,8 @@ def set_grant(client_id, code, request, *args, **kwargs):

@oauth.tokensetter
def set_token(token, request, *args, **kwargs):
# In real project, a token is unique bound to user and client.
# Which means, you don't need to create a token every time.
tok = Token(**token)
tok.user_id = request.user.id
tok.client_id = request.client.client_id
Expand Down Expand Up @@ -191,13 +188,13 @@ def access_token():

@app.route('/api/email')
@oauth.require_oauth(['email'])
def email():
return jsonify(email='me@example.com')
def email(data):
return jsonify(email='me@oauth.net', username=data.user.username)

@app.route('/api/address')
@oauth.require_oauth(['address'])
def address():
return jsonify(address='earth')
def address(data):
return jsonify(address='earth', username=data.user.username)

return app

Expand Down
80 changes: 77 additions & 3 deletions tests/test_oauth2.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,82 @@
# coding: utf-8

import os
import tempfile
from urlparse import urlparse
from flask import Flask
from .oauth2_server import create_server
from .oauth2_client import create_client


app = Flask(__name__)
app.debug = True
app.secret_key = 'development'
class BaseSuite(object):
def setUp(self):
app = Flask(__name__)
app.debug = True
app.testing = True
app.secret_key = 'development'

self.db_fd, self.db_file = tempfile.mkstemp()
config = {
'SQLALCHEMY_DATABASE_URI': 'sqlite:///%s' % self.db_file
}

app = create_server(app)
app = create_client(app)

self.app = app
self.client = app.test_client()
return app

def tearDown(self):
os.close(self.db_fd)
os.unlink(self.db_file)


authorize_url = (
'/oauth/authorize?response_type=code&client_id=dev'
'&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fauthorized&scope=email'
)


class TestAuth(BaseSuite):
def test_login(self):
rv = self.client.get('/login')
assert 'response_type=code' in rv.location

def test_oauth_authorize_invalid_url(self):
rv = self.client.get('/oauth/authorize')
assert 'invalid_client_id' in rv.location

#rv = self.client.get('/oauth/authorize?client_id=dev')
#print rv.data

def test_oauth_authorize_valid_url(self):
rv = self.client.get(authorize_url)
# valid
assert '</form>' in rv.data

rv = self.client.post(authorize_url, data=dict(
confirm='no'
))
assert 'access_denied' in rv.location

rv = self.client.post(authorize_url, data=dict(
confirm='yes'
))
# success
assert 'code=' in rv.location

def test_get_access_token(self):
rv = self.client.post(authorize_url, data={'confirm': 'yes'})
rv = self.client.get(clean_url(rv.location))
assert 'access_token' in rv.data

def test_full_flow(self):
self.test_get_access_token()
rv = self.client.get('/')
assert 'username' in rv.data


def clean_url(location):
ret = urlparse(location)
return '%s?%s' % (ret.path, ret.query)