Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: jans-cli-tui working branch 5 #3649

Merged
merged 14 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 107 additions & 57 deletions jans-cli-tui/cli_tui/cli/config_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pyDes
import stat
import ruamel.yaml

import urllib.parse

from pathlib import Path
from types import SimpleNamespace
Expand All @@ -46,7 +46,14 @@
config_ini_fn = config_dir.joinpath('jans-cli.ini')
sys.path.append(cur_dir)

my_op_mode = 'scim' if 'scim' in os.path.basename(sys.argv[0]) or '-scim' in sys.argv else 'jca'

if 'scim' in os.path.basename(sys.argv[0]) or '-scim' in sys.argv:
my_op_mode = 'scim'
elif 'auth' in os.path.basename(sys.argv[0]) or '-auth' in sys.argv:
my_op_mode = 'auth'
else:
my_op_mode = 'jca'

plugins = []

warning_color = 214
Expand Down Expand Up @@ -110,26 +117,30 @@ def get_plugin_name_from_title(title):
return title[n+1:].strip()
return ''

# load yaml files
cfg_yaml = {}
op_list = []
cfg_yaml[my_op_mode] = {}
for yaml_fn in glob.glob(os.path.join(cur_dir, 'ops', my_op_mode, '*.yaml')):
fn, ext = os.path.splitext(os.path.basename(yaml_fn))
with open(yaml_fn) as f:
config_ = ruamel.yaml.load(f.read().replace('\t', ''), ruamel.yaml.RoundTripLoader)
plugin_name = get_plugin_name_from_title(config_['info']['title'])
cfg_yaml[my_op_mode][plugin_name] = config_

for path in config_['paths']:
for method in config_['paths'][path]:
if isinstance(config_['paths'][path][method], dict):
for tag_ in config_['paths'][path][method].get('tags', []):
tag = get_named_tag(tag_)
if not tag in op_list:
op_list.append(tag)


# load yaml files
def read_swagger(op_mode):
op_list = []
cfg_yaml[op_mode] = {}
for yaml_fn in glob.glob(os.path.join(cur_dir, 'ops', op_mode, '*.yaml')):
fn, ext = os.path.splitext(os.path.basename(yaml_fn))
with open(yaml_fn) as f:
config_ = ruamel.yaml.load(f.read().replace('\t', ''), ruamel.yaml.RoundTripLoader)
plugin_name = get_plugin_name_from_title(config_['info']['title'])
cfg_yaml[op_mode][plugin_name] = config_

for path in config_['paths']:
for method in config_['paths'][path]:
if isinstance(config_['paths'][path][method], dict):
for tag_ in config_['paths'][path][method].get('tags', []):
tag = get_named_tag(tag_)
if not tag in op_list:
op_list.append(tag)
return op_list

op_list = read_swagger(my_op_mode)
op_list.sort()

parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -164,6 +175,7 @@ def get_plugin_name_from_title(title):
parser.add_argument("--log-dir", help="Log directory", default=log_dir)
parser.add_argument("-revoke-session", help="Revokes session", action='store_true')
parser.add_argument("-scim", help="SCIM Mode", action='store_true', default=False)
parser.add_argument("-auth", help="Jans OAuth Server Mode", action='store_true', default=False)

parser.add_argument("--data", help="Path to json data file")
args = parser.parse_args()
Expand Down Expand Up @@ -242,11 +254,13 @@ def write_config():

class JCA_CLI:

def __init__(self, host, client_id, client_secret, access_token, test_client=False, wrapped=None):
def __init__(self, host, client_id, client_secret, access_token, test_client=False, op_mode=None, wrapped=None):
self.host = self.idp_host = host
self.client_id = client_id
self.client_secret = client_secret
self.use_test_client = test_client
self.my_op_mode = op_mode if op_mode else my_op_mode

self.getCredentials()
self.wrapped = wrapped
if wrapped == None:
Expand All @@ -258,11 +272,15 @@ def __init__(self, host, client_id, client_secret, access_token, test_client=Fal
self.set_user()
self.plugins()

if my_op_mode == 'jca':
self.host += '/jans-config-api'
if self.my_op_mode not in cfg_yaml:
read_swagger(self.my_op_mode)

if my_op_mode == 'scim':
if self.my_op_mode == 'jca':
self.host += '/jans-config-api'
elif self.my_op_mode == 'scim':
self.host += '/jans-scim/restv1/v2'
elif self.my_op_mode == 'auth':
self.host += '/jans-auth/restv1'

self.set_logging()
self.ssl_settings()
Expand Down Expand Up @@ -350,7 +368,7 @@ def set_user(self):
sys.exit()

def plugins(self):
for plugin_s in config['DEFAULT'].get(my_op_mode + '_plugins', '').split(','):
for plugin_s in config['DEFAULT'].get(self.my_op_mode + '_plugins', '').split(','):
plugin = plugin_s.strip()
if plugin:
plugins.append(plugin)
Expand All @@ -370,13 +388,17 @@ def drop_to_shell(self, mylocals):
code.interact(local=locals_)
sys.exit()

def get_request_header(self, headers={}, access_token=None):

def get_request_header(self, headers=None, access_token=None):
if headers is None:
headers = {}

if not access_token:
access_token = self.access_token

user = self.get_user_info()
if 'inum' in user:
headers['User-inum'] = user['inum']
user = self.get_user_info()
if 'inum' in user:
headers['User-inum'] = user['inum']

ret_val = {'Authorization': 'Bearer {}'.format(access_token)}
ret_val.update(headers)
Expand Down Expand Up @@ -507,7 +529,7 @@ def validate_date_time(self, date_str):
return False


def get_scoped_access_token(self, scope):
def get_scoped_access_token(self, scope, set_access_token=True):

if not self.wrapped:
scope_text = " for scope {}\n".format(scope) if scope else ''
Expand All @@ -521,9 +543,11 @@ def get_scoped_access_token(self, scope):
else:
post_params = {"grant_type": "client_credentials", "scope": scope}

client = self.use_test_client or self.client_id

response = requests.post(
url,
auth=(self.use_test_client, self.client_secret),
auth=(client, self.client_secret),
data=post_params,
verify=self.verify_ssl,
cert=self.mtls_client_cert
Expand All @@ -532,7 +556,10 @@ def get_scoped_access_token(self, scope):
try:
result = response.json()
if 'access_token' in result:
self.access_token = result['access_token']
if set_access_token:
self.access_token = result['access_token']
else:
return result['access_token']
else:
sys.stderr.write("Error while getting access token")
sys.stderr.write(result)
Expand Down Expand Up @@ -692,11 +719,12 @@ def get_jwt_access_token(self, device_verified=None):
return True, ''

def get_access_token(self, scope):
if self.use_test_client:
self.get_scoped_access_token(scope)
elif not self.access_token and not self.wrapped:
self.check_access_token()
self.get_jwt_access_token()
if self.my_op_mode != 'auth':
if self.use_test_client:
self.get_scoped_access_token(scope)
elif not self.access_token and not self.wrapped:
self.check_access_token()
self.get_jwt_access_token()
return True, ''

def print_exception(self, e):
Expand Down Expand Up @@ -1010,12 +1038,12 @@ def obtain_parameters(self, endpoint, single=False):

def get_path_by_id(self, operation_id):
retVal = {}
for plugin in cfg_yaml[my_op_mode]:
for path in cfg_yaml[my_op_mode][plugin]['paths']:
for method in cfg_yaml[my_op_mode][plugin]['paths'][path]:
if 'operationId' in cfg_yaml[my_op_mode][plugin]['paths'][path][method] and cfg_yaml[my_op_mode][plugin]['paths'][path][method][
for plugin in cfg_yaml[self.my_op_mode]:
for path in cfg_yaml[self.my_op_mode][plugin]['paths']:
for method in cfg_yaml[self.my_op_mode][plugin]['paths'][path]:
if 'operationId' in cfg_yaml[self.my_op_mode][plugin]['paths'][path][method] and cfg_yaml[self.my_op_mode][plugin]['paths'][path][method][
'operationId'] == operation_id:
retVal = cfg_yaml[my_op_mode][plugin]['paths'][path][method].copy()
retVal = cfg_yaml[self.my_op_mode][plugin]['paths'][path][method].copy()
retVal['__path__'] = path
retVal['__method__'] = method
retVal['__urlsuffix__'] = self.get_url_param(path)
Expand Down Expand Up @@ -1075,8 +1103,10 @@ def get_requests(self, endpoint, params={}):
self.print_exception(e)

def get_mime_for_endpoint(self, endpoint, req='requestBody'):
for key in endpoint.info[req]['content']:
return key
if req in endpoint.info:
for key in endpoint.info[req]['content']:
return key


def post_requests(self, endpoint, data):
url = 'https://{}{}'.format(self.host, endpoint.path)
Expand All @@ -1085,6 +1115,8 @@ def post_requests(self, endpoint, data):
mime_type = self.get_mime_for_endpoint(endpoint)

headers = self.get_request_header({'Accept': 'application/json', 'Content-Type': mime_type})
if mime_type:
headers['Content-Type'] = mime_type

response = requests.post(url,
headers=headers,
Expand All @@ -1101,20 +1133,33 @@ def post_requests(self, endpoint, data):
try:
return response.json()
except:
print(response.text)
return {'server_error': response.text}


def delete_requests(self, endpoint, url_param_dict):
security = self.get_scope_for_endpoint(endpoint)
self.get_access_token(security)
url_params = self.get_url_param(endpoint.path)

if url_params:
url_path = endpoint.path.format(**url_param_dict)
for param in url_params:
del url_param_dict[param]
else:
url_path = endpoint.path

if url_param_dict:
url_path += '?'+ urllib.parse.urlencode(url_param_dict)

response = requests.delete(
url='https://{}{}'.format(self.host, endpoint.path.format(**url_param_dict)),
url='https://{}{}'.format(self.host, url_path),
headers=self.get_request_header({'Accept': 'application/json'}),
verify=self.verify_ssl,
cert=self.mtls_client_cert
)

self.log_response(response)

if response.status_code in (200, 204):
return None

Expand Down Expand Up @@ -1217,15 +1262,14 @@ def help_for(self, op_name):

schema_path = None

for plugin in cfg_yaml[my_op_mode]:
for path_name in cfg_yaml[my_op_mode][plugin]['paths']:
for method in cfg_yaml[my_op_mode][plugin]['paths'][path_name]:
path = cfg_yaml[my_op_mode][plugin]['paths'][path_name][method]
for plugin in cfg_yaml[self.my_op_mode]:
for path_name in cfg_yaml[self.my_op_mode][plugin]['paths']:
for method in cfg_yaml[self.my_op_mode][plugin]['paths'][path_name]:
path = cfg_yaml[self.my_op_mode][plugin]['paths'][path_name][method]
if isinstance(path, dict):
for tag_ in path.get('tags', []):
tag = get_named_tag(tag_)
if tag == op_name:
title = cfg_yaml[my_op_mode][plugin]['info']['title']
mode_suffix = plugin+ ':' if plugin else ''
print('Operation ID:', path['operationId'])
print(' Description:', path['description'])
Expand All @@ -1247,7 +1291,13 @@ def help_for(self, op_name):
if 'requestBody' in path:
for apptype in path['requestBody'].get('content', {}):
if 'schema' in path['requestBody']['content'][apptype]:
if path['requestBody']['content'][apptype]['schema'].get('type') == 'array':
if path['requestBody']['content'][apptype]['schema'].get('type') == 'object' and '$ref' not in path['requestBody']['content'][apptype]['schema']:
print(' Parameters:')
for param in path['requestBody']['content'][apptype]['schema']['properties']:
req_s = '*' if param in path['requestBody']['content'][apptype]['schema'].get('required', []) else ''
print(' {}{}: {}'.format(param, req_s, path['requestBody']['content'][apptype]['schema']['properties'][param].get('description') or "Description not found for this property"))

elif path['requestBody']['content'][apptype]['schema'].get('type') == 'array':
schema_path = path['requestBody']['content'][apptype]['schema']['items']['$ref']
print(' Schema: Array of {}{}'.format(mode_suffix, os.path.basename(schema_path)))
else:
Expand Down Expand Up @@ -1327,7 +1377,7 @@ def process_command_post(self, path, suffix_param, endpoint_params, data_fn, dat

endpoint = self.get_fake_endpoint(path)

if not data:
if not data and data_fn:

if data_fn.endswith('jwt'):
with open(data_fn) as reader:
Expand Down Expand Up @@ -1389,6 +1439,7 @@ def process_command_patch(self, path, suffix_param, endpoint_params, data_fn, da
def process_command_delete(self, path, suffix_param, endpoint_params, data_fn, data=None):
endpoint = self.get_fake_endpoint(path)
response = self.delete_requests(endpoint, suffix_param)

if self.wrapped:
return response

Expand Down Expand Up @@ -1440,16 +1491,15 @@ def process_command_by_id(self, operation_id, url_suffix, endpoint_args, data_fn


def get_schema_reference_from_name(self, plugin_name, schema_name):
for plugin in cfg_yaml[my_op_mode]:
if plugin_name == get_plugin_name_from_title(title = cfg_yaml[my_op_mode][plugin]['info']['title']):
for schema in cfg_yaml[my_op_mode][plugin]['components']['schemas']:
for plugin in cfg_yaml[self.my_op_mode]:
if plugin_name == get_plugin_name_from_title(title = cfg_yaml[self.my_op_mode][plugin]['info']['title']):
for schema in cfg_yaml[self.my_op_mode][plugin]['components']['schemas']:
if schema == schema_name:
return '#/components/schemas/' + schema

def get_schema_from_reference(self, plugin_name, ref):

schema_path_list = ref.strip('/#').split('/')
schema = cfg_yaml[my_op_mode][plugin_name][schema_path_list[0]]
schema = cfg_yaml[self.my_op_mode][plugin_name][schema_path_list[0]]

schema_ = schema.copy()

Expand Down
4 changes: 2 additions & 2 deletions jans-cli-tui/cli_tui/cli_style.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
"tab-nav-background": "fg:#b0e0e6 bg:#a9a9a9",
"tab-unselected": "fg:#b0e0e6 bg:#a9a9a9 underline",
"tab-selected": "fg:#000080 bg:#d3d3d3",

##scim
"scim-widget": "bg:black fg:white",

Expand Down Expand Up @@ -172,7 +172,7 @@ def get_color_for_style(style_name:str)->SimpleNamespace:
date_picker_Time = "green" ## only color
date_picker_TimeSelected = "black"

date_picker_calender_prevSelected = "red" #>black >> defult bold
date_picker_calender_prevSelected = "red" #>black >> default bold
date_picker_calenderNSelected = "blue"#>black
date_picker_calenderSelected = "red"

Expand Down
Loading