Skip to content

Commit

Permalink
Merge 69eb357 into d3e2e35
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmxgti committed Oct 29, 2023
2 parents d3e2e35 + 69eb357 commit 944d58e
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 61 deletions.
101 changes: 97 additions & 4 deletions tests/unit/test_duo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
"""Unit tests, and local fixtures for DUO module."""
from unittest.mock import Mock

import pytest

def test_set_passcode(mocker):

def test_get_passcode(mocker):
"""Check if numerical passcode can handle leading zero values."""
from tokendito import duo

mocker.patch("tokendito.user.tty_assertion", return_value=True)
mocker.patch("tokendito.user.input", return_value="0123456")
assert duo.set_passcode({"factor": "passcode"}) == "0123456"
assert duo.get_passcode({"factor": "passcode"}) == "0123456"
assert duo.get_passcode({"factor": "PassCode"}) == "0123456"
assert duo.get_passcode({"factor": "push"}) is None
assert duo.get_passcode("pytest") is None


def test_prepare_duo_info():
Expand Down Expand Up @@ -51,6 +56,10 @@ def test_prepare_duo_info():
}
assert prepare_duo_info(selected_okta_factor) == expected_duo_info

with pytest.raises(SystemExit) as err:
prepare_duo_info({"badresponse": "FAIL"})
assert err.value.code == 1


def test_get_duo_sid(mocker):
"""Check if got sid correct."""
Expand Down Expand Up @@ -87,6 +96,90 @@ def test_get_mfa_response():
from tokendito.duo import get_mfa_response

mfa_result = Mock()
mfa_result.json = Mock(return_value={"response": "test_response"})

assert get_mfa_response(mfa_result) == "test_response"
mfa_result.json = Mock(return_value={"response": "test_value"})
assert get_mfa_response(mfa_result) == "test_value"

with pytest.raises(SystemExit) as err:
get_mfa_response(Mock(return_value={"badresponse": "FAIL"}))
assert err.value.code == 1

with pytest.raises(SystemExit) as err:
get_mfa_response(Mock(return_value="FAIL"))
assert err.value.code == 1


def test_duo_api_post(mocker):
"""Test if duo api post correctly."""
from tokendito.duo import duo_api_post

mock_post = mocker.patch("requests.Session.post")
mock_resp = mocker.Mock()
mock_resp.status_code = 201
mock_resp.json.return_value = {"status": "pytest"}
mock_post.return_value = mock_resp

response = duo_api_post("https://pytest/")
assert response == {"status": "pytest"}


def test_get_duo_devices(mocker):
"""TODO: Implement test."""
from tokendito.duo import get_duo_devices

mock_resp = mocker.Mock()
mock_resp.status_code = 200
mock_resp.content = "<html></html>"

with pytest.raises(SystemExit) as err:
get_duo_devices(mock_resp)
assert err.value.code == 2

mock_resp.content = """
<select name='device'>
<option value='pytest_val'>pytest_text</option>
</select>
"""
assert get_duo_devices(mock_resp) == []

mock_resp.content = """
<select name='device'>
<option value='pytest_device'>pytest_device_name</option>
</select>
<fieldset data-device-index='pytest_device'>
<input name='factor' value='factor_type'>
</fieldset>
"""
assert get_duo_devices(mock_resp) == [
{"device": "pytest_device - pytest_device_name", "factor": "factor_type"}
]


def test_parse_duo_mfa_challenge():
"""TODO: Implement test."""
pass


def test_get_duo_mfa_challenge():
"""TODO: Implement test."""
pass


def test_parse_challenge():
"""TODO: Implement test."""
pass


def test_mfa_verify():
"""TODO: Implement test."""
pass


def test_duo_factor_callback():
"""TODO: Implement test."""
pass


def test_authenticate_duo():
"""TODO: Implement test."""
pass
101 changes: 46 additions & 55 deletions tokendito/duo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
from urllib.parse import unquote
from urllib.parse import urlparse

import bs4
from bs4 import BeautifulSoup
import requests
from tokendito import user
from tokendito.config import config
from tokendito.http_client import HTTP_client

logger = logging.getLogger(__name__)

Expand All @@ -23,21 +24,24 @@ def prepare_duo_info(selected_okta_factor):
:return duo_info: dict of parameters for Duo
"""
duo_info = {}
okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"]
duo_info["okta_factor"] = okta_factor
duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"]

duo_info["state_token"] = selected_okta_factor["stateToken"]
duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"]
duo_info["tx"] = okta_factor["signature"].split(":")[0]
duo_info["tile_sig"] = okta_factor["signature"].split(":")[1]
duo_info["parent"] = f"{config.okta['org']}/signin/verify/duo/web"
duo_info["host"] = okta_factor["host"]
duo_info["sid"] = ""

version = okta_factor["_links"]["script"]["href"].split("-v")[1]
duo_info["version"] = version.strip(".js")

try:
okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"]
duo_info["okta_factor"] = okta_factor
duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"]

duo_info["state_token"] = selected_okta_factor["stateToken"]
duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"]
duo_info["tx"] = okta_factor["signature"].split(":")[0]
duo_info["tile_sig"] = okta_factor["signature"].split(":")[1]
duo_info["parent"] = f"{config.okta['org']}/signin/verify/duo/web"
duo_info["host"] = okta_factor["host"]
duo_info["sid"] = ""

version = okta_factor["_links"]["script"]["href"].split("-v")[1]
duo_info["version"] = version.strip(".js")
except KeyError as missing_key:
logger.error(f"There was an issue parsing the Okta factor. Please try again: {missing_key}")
sys.exit(1)
return duo_info


Expand All @@ -50,29 +54,7 @@ def duo_api_post(url, params=None, headers=None, payload=None):
:param payload: Request body.
:return response: Response to the API request.
"""
try:
response = requests.request("POST", url, params=params, headers=headers, data=payload)
except Exception as request_issue:
logger.error(f"There was an error connecting to the Duo API: {request_issue}")
sys.exit(1)

json_message = None
try:
json_message = response.json()
except ValueError:
logger.debug(f"Non-json response from Duo API: {response}")

if response.status_code != 200:
logger.error(f"Your Duo authentication has failed with status {response.status_code}.")
if json_message and json_message["stat"].lower() != "ok":
logger.error(f"{response.status_code}, {json_message['message']}")
else:
logger.error(
"Please re-run the program with parameter"
' "--loglevel debug" to see more information.'
)
sys.exit(2)

response = HTTP_client.post(url, params=params, headers=headers, data=payload, return_json=True)
return response


Expand Down Expand Up @@ -113,17 +95,22 @@ def get_duo_devices(duo_auth):
:return factor_options: list of dict objects describing each MFA option.
"""
soup = BeautifulSoup(duo_auth.content, "html.parser")
devices = []

device_soup = soup.find("select", {"name": "device"}).findAll("option") # type: ignore
devices = [f"{d['value']} - {d.text}" for d in device_soup]
device_soup = soup.find("select", {"name": "device"})
if type(device_soup) is bs4.element.Tag:
options = device_soup.findAll("option")
devices = [f"{d['value']} - {d.text}" for d in options]
if not devices:
logger.error("Please configure devices for your Duo MFA and retry.")
sys.exit(2)

factor_options = []
factors = []
for device in devices:
options = soup.find("fieldset", {"data-device-index": device.split(" - ")[0]})
factors = options.findAll("input", {"name": "factor"}) # type: ignore (PEP 561)
if type(options) is bs4.element.Tag:
factors = options.findAll("input", {"name": "factor"})
for factor in factors:
factor_option = {"device": device, "factor": factor["value"]}
factor_options.append(factor_option)
Expand Down Expand Up @@ -193,8 +180,12 @@ def get_mfa_response(mfa_result):
"""
try:
verify_mfa = mfa_result.json()["response"]
except Exception as parse_error:
logger.error(f"There was an error parsing the mfa challenge result: {parse_error}")
except KeyError as key_error:
logger.error(f"The mfa challenge response is missing a required parameter: {key_error}")
logger.debug(json.dumps(mfa_result.json()))
sys.exit(1)
except Exception as other_error:
logger.error(f"There was an error getting the mfa challenge result: {other_error}")
sys.exit(1)
return verify_mfa

Expand Down Expand Up @@ -283,18 +274,22 @@ def duo_factor_callback(duo_info, verify_mfa):
return sig_response


def set_passcode(mfa_option):
"""Set totp passcode.
def get_passcode(mfa_option):
"""Get totp passcode.
If the user has selected the passcode option, collect their TOTP.
:param mfa_option: selected factor
:return passcode: passcode value from user
"""
passcode = None
if mfa_option["factor"].lower() == "passcode":
user.print("Type your TOTP and press Enter:")
passcode = user.get_input()

try:
if mfa_option["factor"].lower() == "passcode":
user.print("Type your TOTP and press Enter:")
passcode = user.get_input()
except (TypeError, KeyError):
pass
return passcode


Expand All @@ -309,11 +304,7 @@ def authenticate_duo(selected_okta_factor):
:return payload: required payload for Okta callback
:return headers: required headers for Okta callback
"""
try:
duo_info = prepare_duo_info(selected_okta_factor)
except KeyError as missing_key:
logger.error(f"There was an issue parsing the Okta factor. Please try again: {missing_key}")
sys.exit(1)
duo_info = prepare_duo_info(selected_okta_factor)
# Collect devices, factors, auth params for Duo
duo_info, duo_auth_response = get_duo_sid(duo_info)
factor_options = get_duo_devices(duo_auth_response)
Expand All @@ -323,7 +314,7 @@ def authenticate_duo(selected_okta_factor):

mfa_option = factor_options[mfa_index]
logger.debug(f"Selected MFA is [{mfa_option}]")
passcode = set_passcode(mfa_option)
passcode = get_passcode(mfa_option)

txid = duo_mfa_challenge(duo_info, mfa_option, passcode)
verify_mfa = duo_mfa_verify(duo_info, txid)
Expand Down
4 changes: 2 additions & 2 deletions tokendito/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def get(self, url, params=None, headers=None):
logger.error(f"The get request to {url} failed with {err}")
sys.exit(1)

def post(self, url, data=None, json=None, headers=None, return_json=False):
def post(self, url, data=None, json=None, headers=None, params=None, return_json=False):
"""Perform a POST request."""
try:
response = self.session.post(url, data=data, json=json, headers=headers)
response = self.session.post(url, data=data, json=json, params=params, headers=headers)
response.raise_for_status()
if return_json is True:
try:
Expand Down

0 comments on commit 944d58e

Please sign in to comment.