Skip to content

Commit

Permalink
Some pylint fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
arthepsy committed May 26, 2020
1 parent d801a6e commit 18b0ea0
Showing 1 changed file with 20 additions and 30 deletions.
50 changes: 20 additions & 30 deletions gp-okta.py
Expand Up @@ -142,7 +142,7 @@ def dbg_form(conf, name, data):
if k in [u'SAMLRequest', u'SAMLResponse']:
try:
saml_raw = to_n(base64.b64decode(data[k]).decode('ascii'))
dbg(True, name, '{0}.decoded: {1}'.format(k, saml_raw))
dbg(True, name, '{0}.decoded: {1}'.format(k, saml_raw))
except Exception:
pass

Expand Down Expand Up @@ -177,7 +177,7 @@ def parse_rjson(r):
except Exception as e:
err('failed to parse json: {0}'.format(e))

def parse_form(html, current_url = None):
def parse_form(html, current_url=None):
# type: (etree._Element, Optional[str]) -> Tuple[str, Dict[str, str]]
xform = html.find('.//form')
url = xform.attrib.get('action', '').strip()
Expand All @@ -201,7 +201,7 @@ def __init__(self):
self.vpn_url = '' # for reassignment
self.certs = '' # for filename
self._ocerts = False

def __getattr__(self, name):
# type: (str) -> str
if name in self._store:
Expand All @@ -227,7 +227,7 @@ def get_session(self, name):
def get_value(self, name):
# type: (str) -> str
return to_n(getattr(self, name))

def get_bool(self, name):
# type: (str) -> bool
v = self.get_value(name)
Expand All @@ -250,7 +250,7 @@ def add_cert(self, cert, name='unknown'):
self.certs = self.certs_fh.name
self.certs_fh.write(to_b(cert))
self.certs_fh.flush()

def get_verify(self, name, default_verify=True):
# type: (str, bool) -> Union[str, bool]
name = name.lower()
Expand All @@ -263,13 +263,14 @@ def get_verify(self, name, default_verify=True):
if name == 'gateway' and self._ocerts:
return self.certs
return default_verify

def get_line(self, name):
# type: (str) -> int
if name in self._lines:
return self._lines[name]
return 0

# pylint: disable=protected-access
@classmethod
def from_data(cls, content):
# type: (str) -> Conf
Expand Down Expand Up @@ -359,7 +360,7 @@ def get_state_token(conf, c):
state_token = to_n(to_b(rx_state_token.group(1)).decode('unicode_escape').strip())
return state_token

def get_redirect_url(conf, c, current_url = None):
def get_redirect_url(conf, c, current_url=None):
# type: (Conf, str, Optional[str]) -> Optional[str]
rx_base_url = re.search(r'var\s*baseUrl\s*=\s*\'([^\']+)\'', c)
rx_from_uri = re.search(r'var\s*fromUri\s*=\s*\'([^\']+)\'', c)
Expand Down Expand Up @@ -463,14 +464,13 @@ def okta_saml(conf, saml_xml):
log('okta saml request [okta_url]')
url, data = parse_form(saml_xml)
dbg_form(conf, 'okta.saml request', data)
_, _h, c = send_req(conf, 'okta', 'saml', url, data,
expected_url=conf.okta_url)
_, _h, c = send_req(conf, 'okta', 'saml', url, data, expected_url=conf.okta_url)
redirect_url = get_redirect_url(conf, c, url)
if redirect_url is None:
err('did not find redirect url')
return redirect_url

def okta_auth(conf, stateToken = None):
def okta_auth(conf, stateToken=None):
# type: (Conf, Optional[str]) -> Any
log('okta auth request [okta_url]')
url = '{0}/api/v1/authn'.format(conf.okta_url)
Expand Down Expand Up @@ -507,8 +507,7 @@ def okta_transaction_state(conf, j):
if len(state_token) == 0:
err('empty state token')
data = {'stateToken': state_token}
_, _h, j = send_json_req(conf, 'okta', 'skip', url, data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'skip', url, data, expected_url=conf.okta_url)
return False, j
# status: password_expired
# status: recovery
Expand Down Expand Up @@ -593,8 +592,7 @@ def okta_mfa_totp(conf, factor, state_token):
'passCode': code
}
log('mfa {0} totp request: {1} [okta_url]'.format(provider, code))
_, _h, j = send_json_req(conf, 'okta', 'totp mfa', factor.get('url', ''), data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'totp mfa', factor.get('url', ''), data, expected_url=conf.okta_url)
return j

def okta_mfa_sms(conf, factor, state_token):
Expand All @@ -605,15 +603,13 @@ def okta_mfa_sms(conf, factor, state_token):
'stateToken': state_token
}
log('mfa {0} sms request [okta_url]'.format(provider))
_, _h, j = send_json_req(conf, 'okta', 'sms mfa (1)', factor.get('url', ''), data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'sms mfa (1)', factor.get('url', ''), data, expected_url=conf.okta_url)
code = input('{0} SMS verification code: '.format(provider)).strip()
if len(code) == 0:
return None
data['passCode'] = code
log('mfa {0} sms request [okta_url]'.format(provider))
_, _h, j = send_json_req(conf, 'okta', 'sms mfa (2)', factor.get('url', ''), data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'sms mfa (2)', factor.get('url', ''), data, expected_url=conf.okta_url)
return j

def okta_mfa_push(conf, factor, state_token):
Expand All @@ -627,9 +623,7 @@ def okta_mfa_push(conf, factor, state_token):
status = 'MFA_CHALLENGE'
counter = 0
while status == 'MFA_CHALLENGE':
_, _h, j = send_json_req(conf, 'okta', 'push mfa ({0})'.format(counter),
factor.get('url', ''), data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'push mfa ({0})'.format(counter), factor.get('url', ''), data, expected_url=conf.okta_url)
status = j.get('status', '').strip()
dbg(conf.debug, 'status', status)
if status == 'MFA_CHALLENGE':
Expand All @@ -649,8 +643,7 @@ def okta_mfa_webauthn(conf, factor, state_token):
data = {
'stateToken': state_token
}
_, _h, j = send_json_req(conf, 'okta', 'webauthn mfa challenge', factor.get('url', ''), data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'webauthn mfa challenge', factor.get('url', ''), data, expected_url=conf.okta_url)
rfactor = j['_embedded']['factor']
profile = rfactor['profile']
purl = parse_url(conf.okta_url)
Expand Down Expand Up @@ -678,8 +671,7 @@ def okta_mfa_webauthn(conf, factor, state_token):
'authenticatorData': to_n((base64.b64encode(assertion.auth_data)).decode('ascii'))
}
log('mfa {0} signature request [okta_url]'.format(provider))
_, _h, j = send_json_req(conf, 'okta', 'uf2 mfa signature', j['_links']['next']['href'], data,
expected_url=conf.okta_url)
_, _h, j = send_json_req(conf, 'okta', 'uf2 mfa signature', j['_links']['next']['href'], data, expected_url=conf.okta_url)
return j

def okta_redirect(conf, session_token, redirect_url, gateway_url=None):
Expand Down Expand Up @@ -736,7 +728,7 @@ def okta_redirect(conf, session_token, redirect_url, gateway_url=None):
dbg(conf.debug, 'saml prop', [saml_auth_status, saml_slo])
return saml_username, prelogin_cookie

def paloalto_getconfig(conf, username = None, prelogin_cookie = None, can_fail = False):
def paloalto_getconfig(conf, username=None, prelogin_cookie=None, can_fail=False):
# type: (Conf, Optional[str], Optional[str], bool) -> Tuple[int, str, Dict[str, str]]
log('getconfig request [vpn_url]')
url = '{0}/global-protect/getconfig.esp'.format(conf.vpn_url)
Expand Down Expand Up @@ -789,14 +781,12 @@ def okta_saml_2(conf, gateway_url, saml_xml):
log('okta saml request (2) [okta_url]')
url, data = parse_form(saml_xml)
dbg_form(conf, 'okta.saml request(2)', data)
_, h, c = send_req(conf, 'okta', 'okta saml request (2)', url, data,
expected_url=conf.okta_url)
_, h, c = send_req(conf, 'okta', 'okta saml request (2)', url, data, expected_url=conf.okta_url)
xhtml = parse_html(c)
url, data = parse_form(xhtml)
dbg_form(conf, 'okta.saml request(2)', data)
log('okta redirect form request (2) [gateway]')
_, h, c = send_req(conf, 'gateway', 'okta redirect form (2)', url, data,
expected_url=gateway_url)
_, h, c = send_req(conf, 'gateway', 'okta redirect form (2)', url, data, expected_url=gateway_url)
saml_username = h.get('saml-username', '').strip()
if len(saml_username) == 0:
err('saml-username empty')
Expand Down

0 comments on commit 18b0ea0

Please sign in to comment.