Skip to content

Commit

Permalink
Merge pull request #22 from betagouv/test_token_fail
Browse files Browse the repository at this point in the history
Add checks on client call validity
  • Loading branch information
thimy committed May 28, 2019
2 parents c264f65 + 51c79a5 commit 35e84af
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 111 deletions.
6 changes: 5 additions & 1 deletion aidants_connect/settings.py
Expand Up @@ -14,6 +14,10 @@
from dotenv import load_dotenv

load_dotenv(verbose=True)
FC_CALLBACK_URL = os.environ["FC_CALLBACK_URL"]
FC_AS_FS_ID = os.environ["FC_AS_FS_ID"]
FC_AS_FS_SECRET = os.environ["FC_AS_FS_SECRET"]
HOST = os.environ["HOST"]

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
Expand All @@ -31,7 +35,7 @@
else:
DEBUG = False

ALLOWED_HOSTS = [os.getenv("HOST")]
ALLOWED_HOSTS = [os.environ["HOST"]]


# Application definition
Expand Down
70 changes: 50 additions & 20 deletions aidants_connect_web/migrations/0004_auto_20190520_1719.py
Expand Up @@ -6,34 +6,64 @@

class Migration(migrations.Migration):

dependencies = [
('aidants_connect_web', '0003_auto_20190507_1711'),
]
dependencies = [("aidants_connect_web", "0003_auto_20190507_1711")]

operations = [
migrations.CreateModel(
name='Usager',
name="Usager",
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('given_name', models.TextField()),
('family_name', models.TextField()),
('preferred_username', models.TextField(blank=True)),
('birthdate', models.DateField()),
('gender', models.CharField(choices=[('F', 'Femme'), ('H', 'Homme')], default='F', max_length=1)),
('birthplace', models.PositiveIntegerField(validators=[django.core.validators.MinValueValidator(9999), django.core.validators.MaxValueValidator(100000)])),
('birthcountry', models.IntegerField(default=99100, validators=[django.core.validators.MinValueValidator(99100), django.core.validators.MaxValueValidator(99500)])),
('sub', models.TextField(default='No Sub yet')),
('email', models.EmailField(max_length=254)),
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("given_name", models.TextField()),
("family_name", models.TextField()),
("preferred_username", models.TextField(blank=True)),
("birthdate", models.DateField()),
(
"gender",
models.CharField(
choices=[("F", "Femme"), ("H", "Homme")],
default="F",
max_length=1,
),
),
(
"birthplace",
models.PositiveIntegerField(
validators=[
django.core.validators.MinValueValidator(9999),
django.core.validators.MaxValueValidator(100000),
]
),
),
(
"birthcountry",
models.IntegerField(
default=99100,
validators=[
django.core.validators.MinValueValidator(99100),
django.core.validators.MaxValueValidator(99500),
],
),
),
("sub", models.TextField(default="No Sub yet")),
("email", models.EmailField(max_length=254)),
],
),
migrations.AddField(
model_name='connection',
name='access_token',
field=models.TextField(default='No token Provided'),
model_name="connection",
name="access_token",
field=models.TextField(default="No token Provided"),
),
migrations.AddField(
model_name='connection',
name='sub_usager',
field=models.TextField(default='No sub Provided'),
model_name="connection",
name="sub_usager",
field=models.TextField(default="No sub Provided"),
),
]
4 changes: 3 additions & 1 deletion aidants_connect_web/models.py
Expand Up @@ -4,10 +4,12 @@
from django.contrib.auth.models import AbstractUser
from django.core.validators import MinValueValidator, MaxValueValidator

CONNECTION_EXPIRATION_TIME = 10


def default_expiration_date():
now = timezone.now()
return now + timedelta(minutes=30)
return now + timedelta(minutes=CONNECTION_EXPIRATION_TIME)


class Connection(models.Model):
Expand Down
177 changes: 107 additions & 70 deletions aidants_connect_web/tests/test_views.py
@@ -1,18 +1,23 @@
import os
import json
from pytz import timezone
from secrets import token_urlsafe
from unittest.mock import patch
from datetime import date
from datetime import date, datetime, timedelta
from freezegun import freeze_time

from django.test.client import Client
from django.test import TestCase
from django.test import TestCase, override_settings
from django.urls import resolve
from django.core.exceptions import ObjectDoesNotExist

from aidants_connect_web.views import home_page, authorize, token, user_info
from aidants_connect_web.forms import UsagerForm
from aidants_connect_web.models import Connection, User, Usager
from aidants_connect_web.models import (
Connection,
User,
Usager,
CONNECTION_EXPIRATION_TIME,
)

fc_callback_url = os.getenv("FC_CALLBACK_URL")

Expand Down Expand Up @@ -133,79 +138,96 @@ def test_sending_user_information_triggers_callback(self):
self.assertRedirects(response, url, fetch_redirect_response=False)


@override_settings(
FC_AS_FS_ID="test_client_id",
FC_AS_FS_SECRET="test_client_secret",
FC_CALLBACK_URL="test_url.test_url",
HOST="localhost",
)
class TokenTests(TestCase):
def setUp(self):
self.connection = Connection()
self.connection.state = "test_state"
self.connection.code = "test_code"
self.connection.nonce = "test_nonce"
self.connection.sub_usager = "test_sub"
self.connection.expiresOn = datetime(
2012, 1, 14, 3, 21, 34, tzinfo=timezone("Europe/Paris")
)
self.connection.save()
self.fc_request = {
"grant_type": "authorization_code",
"redirect_uri": "test_url.test_url/oidc_callback",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"code": "test_code",
}

def test_token_url_triggers_token_view(self):
found = resolve("/token/")
self.assertEqual(found.func, token)

@freeze_time("2012-01-14 03:21:34", tz_offset=2)
def test_token_should_respond_when_given_correct_info(self):
with patch.dict(
"os.environ",
{
"FC_AS_FS_ID": "test_client_id",
"FC_AS_FS_SECRET": "test_client_secret",
"FC_CALLBACK_URL": "test_url.test_url",
"HOST": "localhost",
},
):
response = self.client.post(
"/token/",
{
"grant_type": "authorization_code",
"redirect_uri": "test_url.test_url/oidc_callback",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"code": "test_code",
},
)
response_content = response.content.decode("utf-8")
self.assertEqual(response.status_code, 200)
response_json = json.loads(response_content)
connection = Connection.objects.get(code="test_code")
awaited_response = {
"access_token": connection.access_token,
"expires_in": 3600,
"id_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJ0ZXN0X2Nsa"
"WVudF9pZCIsImV4cCI6MTMyNjUxMTg5NCwiaWF0IjoxMzI2NTExMjk0LCJ"
"pc3MiOiJsb2NhbGhvc3QiLCJzdWIiOiJ0ZXN0X3N1YiIsIm5vbmNl"
"IjoidGVzdF9ub25jZSJ9.VeupzW4ejtdGl2oAgOalfFGdAnxlc66G"
"SIzu3T3Ob7s",
"refresh_token": "5ieq7Bg173y99tT6MA",
"token_type": "Bearer",
}

self.assertEqual(response_json, awaited_response)

def test_token_should_respond_403_when_given_wrong_grant_type(self):
with patch.dict(
"os.environ",
{
"FC_AS_FS_ID": "test_client_id",
"FC_AS_FS_SECRET": "test_client_secret",
"FC_CALLBACK_URL": "test_url.test_url",
"HOST": "localhost",
},
):
response = self.client.post(
"/token/",
{
"grant_type": "not_authorization_code",
"redirect_uri": "test_url.test_url/oidc_callback",
"client_id": "test_client_id",
"client_secret": "test_client_secret",
"code": "test_code",
},
)
self.assertEqual(response.status_code, 403)
date = datetime(2012, 1, 14, 3, 20, 34, 0, tzinfo=timezone("Europe/Paris"))

@freeze_time(date)
def test_correct_info_triggers_200(self):

response = self.client.post("/token/", self.fc_request)

response_content = response.content.decode("utf-8")
self.assertEqual(response.status_code, 200)
response_json = json.loads(response_content)
connection = Connection.objects.get(code="test_code")
awaited_response = {
"access_token": connection.access_token,
"expires_in": 3600,
"id_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJ0ZXN0X2NsaWVud"
"F9pZCIsImV4cCI6MTMyNjUxMTI5NCwiaWF0IjoxMzI2NTEwNjk0LCJpc3MiOiJ"
"sb2NhbGhvc3QiLCJzdWIiOiJ0ZXN0X3N1YiIsIm5vbmNlIjoidGVzdF9ub25jZ"
"SJ9.aYSfYJK_Lml15DY7MuhrUBI1wja70WBfeyKqiUBMLlE",
"refresh_token": "5ieq7Bg173y99tT6MA",
"token_type": "Bearer",
}

self.assertEqual(response_json, awaited_response)

def test_wrong_grant_type_triggers_403(self):
fc_request = dict(self.fc_request)
fc_request["grant_type"] = "not_authorization_code"
response = self.client.post("/token/", fc_request)
self.assertEqual(response.status_code, 403)

def test_wrong_redirect_uri_triggers_403(self):
fc_request = dict(self.fc_request)
fc_request["redirect_uri"] = "test_url.test_url/wrong_uri"

response = self.client.post("/token/", fc_request)
self.assertEqual(response.status_code, 403)

def test_wrong_client_id_triggers_403(self):
fc_request = dict(self.fc_request)
fc_request["client_id"] = "wrong_client_id"
response = self.client.post("/token/", fc_request)
self.assertEqual(response.status_code, 403)

def test_wrong_client_secret_triggers_403(self):
fc_request = dict(self.fc_request)
fc_request["client_secret"] = "wrong_client_secret"
response = self.client.post("/token/", fc_request)
self.assertEqual(response.status_code, 403)

def test_wrong_code_triggers_403(self):
fc_request = dict(self.fc_request)
fc_request["code"] = "wrong_code"
response = self.client.post("/token/", fc_request)
self.assertEqual(response.status_code, 403)

date_expired = date + timedelta(minutes=CONNECTION_EXPIRATION_TIME + 20)

@freeze_time(date_expired)
def test_expired_code_triggers_403(self):
response = self.client.post("/token/", self.fc_request)
self.assertEqual(response.status_code, 403)


class UserInfoTests(TestCase):
Expand All @@ -218,6 +240,9 @@ def setUp(self):
self.connection.nonce = "test_nonce"
self.connection.sub_usager = "test_sub"
self.connection.access_token = "test_access_token"
self.connection.expiresOn = datetime(
2012, 1, 14, 3, 21, 34, 0, tzinfo=timezone("Europe/Paris")
)
self.connection.save()

self.usager = Usager()
Expand All @@ -236,7 +261,10 @@ def test_token_url_triggers_token_view(self):
found = resolve("/userinfo/")
self.assertEqual(found.func, user_info)

def test_user_info_is_given_when_access_token_is_right(self):
date = datetime(2012, 1, 14, 3, 20, 34, 0, tzinfo=timezone("Europe/Paris"))

@freeze_time(date)
def test_well_formatted_access_token_returns_200(self):
response = self.client.get(
"/userinfo/", **{"HTTP_AUTHORIZATION": "Bearer test_access_token"}
)
Expand All @@ -257,24 +285,33 @@ def test_user_info_is_given_when_access_token_is_right(self):

self.assertEqual(content, FC_formated_info)

def test_user_info_returns_403_when_authorization_is_badly_formated(self):
date_expired = date + timedelta(minutes=CONNECTION_EXPIRATION_TIME + 20)

@freeze_time(date_expired)
def test_expired_access_token_returns_403(self):
response = self.client.get(
"/userinfo/", **{"HTTP_AUTHORIZATION": "test_access_token"}
"/userinfo/", **{"HTTP_AUTHORIZATION": "Bearer test_access_token"}
)

self.assertEqual(response.status_code, 403)

def test_user_info_returns_403_when_authorization_has_wrong_token(self):
def test_badly_formatted_authorization_header_triggers_403(self):
response = self.client.get(
"/userinfo/", **{"HTTP_AUTHORIZATION": "wrong_access_token"}
"/userinfo/", **{"HTTP_AUTHORIZATION": "test_access_token"}
)
self.assertEqual(response.status_code, 403)

def test_user_info_returns_403_when_authorization_has_wrong_intro(self):
response = self.client.get(
"/userinfo/", **{"HTTP_AUTHORIZATION": "Bearer: test_access_token"}
)
self.assertEqual(response.status_code, 403)

def test_wrong_token_triggers_403(self):
response = self.client.get(
"/userinfo/", **{"HTTP_AUTHORIZATION": "wrong_access_token"}
)
self.assertEqual(response.status_code, 403)


class EnvironmentVariableTest(TestCase):
def test_environment_variables_are_accessible(self):
Expand Down

0 comments on commit 35e84af

Please sign in to comment.