diff --git a/gp-okta.py b/gp-okta.py index e3067e8..4eb2467 100755 --- a/gp-okta.py +++ b/gp-okta.py @@ -142,7 +142,7 @@ def dbg_form(conf, name, data): if k in [u'SAMLRequest', u'SAMLResponse']: try: saml_raw = to_n(base64.b64decode(data[k]).decode('ascii')) - dbg(True, name, '{0}.decoded: {1}'.format(k, saml_raw)) + dbg(True, name, '{0}.decoded: {1}'.format(k, saml_raw)) except Exception: pass @@ -177,7 +177,7 @@ def parse_rjson(r): except Exception as e: err('failed to parse json: {0}'.format(e)) -def parse_form(html, current_url = None): +def parse_form(html, current_url=None): # type: (etree._Element, Optional[str]) -> Tuple[str, Dict[str, str]] xform = html.find('.//form') url = xform.attrib.get('action', '').strip() @@ -201,7 +201,7 @@ def __init__(self): self.vpn_url = '' # for reassignment self.certs = '' # for filename self._ocerts = False - + def __getattr__(self, name): # type: (str) -> str if name in self._store: @@ -227,7 +227,7 @@ def get_session(self, name): def get_value(self, name): # type: (str) -> str return to_n(getattr(self, name)) - + def get_bool(self, name): # type: (str) -> bool v = self.get_value(name) @@ -250,7 +250,7 @@ def add_cert(self, cert, name='unknown'): self.certs = self.certs_fh.name self.certs_fh.write(to_b(cert)) self.certs_fh.flush() - + def get_verify(self, name, default_verify=True): # type: (str, bool) -> Union[str, bool] name = name.lower() @@ -263,13 +263,14 @@ def get_verify(self, name, default_verify=True): if name == 'gateway' and self._ocerts: return self.certs return default_verify - + def get_line(self, name): # type: (str) -> int if name in self._lines: return self._lines[name] return 0 + # pylint: disable=protected-access @classmethod def from_data(cls, content): # type: (str) -> Conf @@ -359,7 +360,7 @@ def get_state_token(conf, c): state_token = to_n(to_b(rx_state_token.group(1)).decode('unicode_escape').strip()) return state_token -def get_redirect_url(conf, c, current_url = None): +def get_redirect_url(conf, c, current_url=None): # type: (Conf, str, Optional[str]) -> Optional[str] rx_base_url = re.search(r'var\s*baseUrl\s*=\s*\'([^\']+)\'', c) rx_from_uri = re.search(r'var\s*fromUri\s*=\s*\'([^\']+)\'', c) @@ -463,14 +464,13 @@ def okta_saml(conf, saml_xml): log('okta saml request [okta_url]') url, data = parse_form(saml_xml) dbg_form(conf, 'okta.saml request', data) - _, _h, c = send_req(conf, 'okta', 'saml', url, data, - expected_url=conf.okta_url) + _, _h, c = send_req(conf, 'okta', 'saml', url, data, expected_url=conf.okta_url) redirect_url = get_redirect_url(conf, c, url) if redirect_url is None: err('did not find redirect url') return redirect_url -def okta_auth(conf, stateToken = None): +def okta_auth(conf, stateToken=None): # type: (Conf, Optional[str]) -> Any log('okta auth request [okta_url]') url = '{0}/api/v1/authn'.format(conf.okta_url) @@ -507,8 +507,7 @@ def okta_transaction_state(conf, j): if len(state_token) == 0: err('empty state token') data = {'stateToken': state_token} - _, _h, j = send_json_req(conf, 'okta', 'skip', url, data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'skip', url, data, expected_url=conf.okta_url) return False, j # status: password_expired # status: recovery @@ -593,8 +592,7 @@ def okta_mfa_totp(conf, factor, state_token): 'passCode': code } log('mfa {0} totp request: {1} [okta_url]'.format(provider, code)) - _, _h, j = send_json_req(conf, 'okta', 'totp mfa', factor.get('url', ''), data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'totp mfa', factor.get('url', ''), data, expected_url=conf.okta_url) return j def okta_mfa_sms(conf, factor, state_token): @@ -605,15 +603,13 @@ def okta_mfa_sms(conf, factor, state_token): 'stateToken': state_token } log('mfa {0} sms request [okta_url]'.format(provider)) - _, _h, j = send_json_req(conf, 'okta', 'sms mfa (1)', factor.get('url', ''), data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'sms mfa (1)', factor.get('url', ''), data, expected_url=conf.okta_url) code = input('{0} SMS verification code: '.format(provider)).strip() if len(code) == 0: return None data['passCode'] = code log('mfa {0} sms request [okta_url]'.format(provider)) - _, _h, j = send_json_req(conf, 'okta', 'sms mfa (2)', factor.get('url', ''), data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'sms mfa (2)', factor.get('url', ''), data, expected_url=conf.okta_url) return j def okta_mfa_push(conf, factor, state_token): @@ -627,9 +623,7 @@ def okta_mfa_push(conf, factor, state_token): status = 'MFA_CHALLENGE' counter = 0 while status == 'MFA_CHALLENGE': - _, _h, j = send_json_req(conf, 'okta', 'push mfa ({0})'.format(counter), - factor.get('url', ''), data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'push mfa ({0})'.format(counter), factor.get('url', ''), data, expected_url=conf.okta_url) status = j.get('status', '').strip() dbg(conf.debug, 'status', status) if status == 'MFA_CHALLENGE': @@ -649,8 +643,7 @@ def okta_mfa_webauthn(conf, factor, state_token): data = { 'stateToken': state_token } - _, _h, j = send_json_req(conf, 'okta', 'webauthn mfa challenge', factor.get('url', ''), data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'webauthn mfa challenge', factor.get('url', ''), data, expected_url=conf.okta_url) rfactor = j['_embedded']['factor'] profile = rfactor['profile'] purl = parse_url(conf.okta_url) @@ -678,8 +671,7 @@ def okta_mfa_webauthn(conf, factor, state_token): 'authenticatorData': to_n((base64.b64encode(assertion.auth_data)).decode('ascii')) } log('mfa {0} signature request [okta_url]'.format(provider)) - _, _h, j = send_json_req(conf, 'okta', 'uf2 mfa signature', j['_links']['next']['href'], data, - expected_url=conf.okta_url) + _, _h, j = send_json_req(conf, 'okta', 'uf2 mfa signature', j['_links']['next']['href'], data, expected_url=conf.okta_url) return j def okta_redirect(conf, session_token, redirect_url, gateway_url=None): @@ -736,7 +728,7 @@ def okta_redirect(conf, session_token, redirect_url, gateway_url=None): dbg(conf.debug, 'saml prop', [saml_auth_status, saml_slo]) return saml_username, prelogin_cookie -def paloalto_getconfig(conf, username = None, prelogin_cookie = None, can_fail = False): +def paloalto_getconfig(conf, username=None, prelogin_cookie=None, can_fail=False): # type: (Conf, Optional[str], Optional[str], bool) -> Tuple[int, str, Dict[str, str]] log('getconfig request [vpn_url]') url = '{0}/global-protect/getconfig.esp'.format(conf.vpn_url) @@ -789,14 +781,12 @@ def okta_saml_2(conf, gateway_url, saml_xml): log('okta saml request (2) [okta_url]') url, data = parse_form(saml_xml) dbg_form(conf, 'okta.saml request(2)', data) - _, h, c = send_req(conf, 'okta', 'okta saml request (2)', url, data, - expected_url=conf.okta_url) + _, h, c = send_req(conf, 'okta', 'okta saml request (2)', url, data, expected_url=conf.okta_url) xhtml = parse_html(c) url, data = parse_form(xhtml) dbg_form(conf, 'okta.saml request(2)', data) log('okta redirect form request (2) [gateway]') - _, h, c = send_req(conf, 'gateway', 'okta redirect form (2)', url, data, - expected_url=gateway_url) + _, h, c = send_req(conf, 'gateway', 'okta redirect form (2)', url, data, expected_url=gateway_url) saml_username = h.get('saml-username', '').strip() if len(saml_username) == 0: err('saml-username empty')