Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve issue 128 - yubikey auth flow broken by google page update #136

Merged
merged 7 commits into from
Jul 23, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 55 additions & 13 deletions aws_google_auth/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import logging
import os
import re
import sys

import requests
Expand Down Expand Up @@ -132,6 +133,42 @@ def parse_error_message(sess):
else:
return error.text

@staticmethod
def find_key_handle(input, challengeTxt):
typeOfInput = type(input)
if typeOfInput == dict: # parse down a dict
for item in input:
rvalue = Google.find_key_handle(input[item], challengeTxt)
if rvalue is not None:
return rvalue

elif typeOfInput == list: # looks like we've hit an array - iterate it
array = list(filter(None, input)) # remove any None type objects from the array
for item in array:
typeValue = type(item)
if typeValue == list: # another array - recursive call
return Google.find_key_handle(item, challengeTxt)
elif typeValue == int or typeValue == bool: # ints bools etc we don't care
continue
else: # we went a string or unicode here (python 3.x lost unicode global)
try: # keyHandle string will be base64 encoded -
# if its not an exception is thrown and we continue as its not the string we're after
base64UrlEncoded = base64.urlsafe_b64encode(base64.b64decode(item))
if base64UrlEncoded != challengeTxt: # make sure its not the challengeTxt - if it not return it
return base64UrlEncoded
except:
pass

@staticmethod
def find_app_id(inputString):
try:
searchResult = re.search('"appid":"[a-z://.-_]+"', inputString).group()
searchObject = json.loads('{' + searchResult + '}')
return str(searchObject['appid'])
except:
logging.exception('Was unable to find appid value in googles SAML page')
sys.exit(1)

def do_login(self):
self.session = requests.Session()
self.session.headers['User-Agent'] = "AWS Sign-in/{} (Cevo aws-google-auth)".format(self.version)
Expand Down Expand Up @@ -357,26 +394,35 @@ def handle_captcha(self, sess, payload):
def handle_sk(self, sess):
response_page = BeautifulSoup(sess.text, 'html.parser')
challenge_url = sess.url.split("?")[0]

challenges_txt = response_page.find('input', {
'name': "id-challenge"
}).get('value')
challenges = json.loads(challenges_txt)

facet_url = urllib_parse.urlparse(challenge_url)
facet = facet_url.scheme + "://" + facet_url.netloc
app_id = challenges["appId"]

keyHandleJSField = response_page.find('div', {'jsname': 'C0oDBd'}).get('data-challenge-ui')
startJSONPosition = keyHandleJSField.find('{')
endJSONPosition = keyHandleJSField.rfind('}')
keyHandleJsonPayload = json.loads(keyHandleJSField[startJSONPosition:endJSONPosition + 1])

keyHandle = self.find_key_handle(keyHandleJsonPayload, base64.urlsafe_b64encode(base64.b64decode(challenges_txt)))
appId = self.find_app_id(str(keyHandleJsonPayload))

# txt sent for signing needs to be base64 url encode
# we also have to remove any base64 padding because including including it will prevent google accepting the auth response
challenges_txt_encode_pad_removed = base64.urlsafe_b64encode(base64.b64decode(challenges_txt)).strip('='.encode())

u2f_challenges = []
for c in challenges["challenges"]:
c["appId"] = app_id
u2f_challenges.append(c)
u2f_challenges.append({'version': 'U2F_V2', 'challenge': challenges_txt_encode_pad_removed.decode(), 'appId': appId, 'keyHandle': keyHandle.decode()})

# Prompt the user up to attempts_remaining times to insert their U2F device.
attempts_remaining = 5
auth_response = None
while True:
try:
auth_response = json.dumps(u2f.u2f_auth(u2f_challenges, facet))
auth_response_dict = u2f.u2f_auth(u2f_challenges, facet)
auth_response = json.dumps(auth_response_dict)
break
except RuntimeWarning:
logging.error("No U2F device found. %d attempts remaining",
Expand All @@ -403,8 +449,7 @@ def handle_sk(self, sess):
response_page.find('input', {
'name': 'challengeType'
}).get('value'),
'continue':
response_page.find('input', {
'continue': response_page.find('input', {
'name': 'continue'
}).get('value'),
'scc':
Expand All @@ -419,10 +464,7 @@ def handle_sk(self, sess):
response_page.find('input', {
'name': 'checkedDomains'
}).get('value'),
'pstMsg':
response_page.find('input', {
'name': 'pstMsg'
}).get('value'),
'pstMsg': '1',
'TL':
response_page.find('input', {
'name': 'TL'
Expand Down