From 776bab37c1eab6fd548232504538078854ae546d Mon Sep 17 00:00:00 2001 From: Devrim Date: Fri, 10 Feb 2023 20:34:05 +0300 Subject: [PATCH] fix: jans-cli-tui working branch 7 (#3824) * feat: jans-cli-tui path browser (ref: #3823) * feat: jans-cli-tui implement path browser for save data * feat: jans-cli-tui agama project deployment * fix: jans-cli-tui remove unused path finder * fix: jans-cli-tui code smells * fix: jans-cli-tui code smells --- jans-cli-tui/cli_tui/jans_cli_tui.py | 81 +--- .../cli_tui/plugins/010_auth_server/agama.py | 170 +++++++ .../cli_tui/plugins/010_auth_server/main.py | 5 +- jans-cli-tui/cli_tui/utils/static.py | 1 + .../cli_tui/wui_components/jans_cli_dialog.py | 10 +- .../wui_components/jans_path_browser.py | 227 +++++++++ .../wui_components/jans_path_finder.py.txt | 451 ------------------ 7 files changed, 426 insertions(+), 519 deletions(-) create mode 100644 jans-cli-tui/cli_tui/plugins/010_auth_server/agama.py create mode 100644 jans-cli-tui/cli_tui/wui_components/jans_path_browser.py delete mode 100644 jans-cli-tui/cli_tui/wui_components/jans_path_finder.py.txt diff --git a/jans-cli-tui/cli_tui/jans_cli_tui.py b/jans-cli-tui/cli_tui/jans_cli_tui.py index ea6c652191f..81096402d6d 100755 --- a/jans-cli-tui/cli_tui/jans_cli_tui.py +++ b/jans-cli-tui/cli_tui/jans_cli_tui.py @@ -42,47 +42,6 @@ from prompt_toolkit.key_binding.bindings.focus import focus_next, focus_previous from prompt_toolkit.layout.containers import Float, HSplit, VSplit from prompt_toolkit.formatted_text import HTML, merge_formatted_text -from prompt_toolkit.completion import PathCompleter - - -class TextInputDialog: - def __init__(self, title="", label_text="", completer=None): - self.future = Future() - - def accept_text(buf): - get_app().layout.focus(ok_button) - buf.complete_state = None - return True - - def accept(): - self.future.set_result(self.text_area.text) - - def cancel(): - self.future.set_result(None) - - self.text_area = TextArea( - completer=completer, - multiline=False, - width=D(preferred=40), - accept_handler=accept_text, - ) - - ok_button = Button(text="OK", handler=accept) - cancel_button = Button(text="Cancel", handler=cancel) - - self.dialog = Dialog( - title=title, - body=HSplit([Label(text=label_text), self.text_area]), - buttons=[ok_button, cancel_button], - width=D(preferred=80), - modal=True, - ) - - def __pt_container__(self): - return self.dialog - - - from prompt_toolkit.layout.containers import ( Float, HSplit, @@ -123,6 +82,7 @@ def __pt_container__(self): from wui_components.jans_cli_dialog import JansGDialog from wui_components.jans_nav_bar import JansNavBar from wui_components.jans_message_dialog import JansMessageDialog +from wui_components.jans_path_browser import jans_file_browser_dialog, BrowseType home_dir = Path.home() config_dir = home_dir.joinpath('.config') @@ -158,6 +118,7 @@ def __init__(self): self.pbar_text = "" self.progressing_text = "" self.mouse_float=True + self.browse_path = '/' self.not_implemented = Frame( body=HSplit([Label(text=_("Not imlemented yet")), Button(text=_("MyButton"))], width=D()), @@ -814,9 +775,11 @@ async def show_dialog_as_float(self, dialog:Dialog, focus=None) -> None: 'Coroutine.' float_ = Float(content=dialog) self.root_layout.floats.append(float_) - self.layout.focus(dialog) + if focus: self.layout.focus(focus) + else: + self.layout.focus(dialog) self.invalidate() @@ -837,10 +800,12 @@ def show_jans_dialog(self, dialog:Dialog, focus=None) -> None: async def coroutine(): focused_before = self.layout.current_window result = await self.show_dialog_as_float(dialog, focus) - try: - self.layout.focus(focused_before) - except Exception: - self.layout.focus(self.center_frame) + + if not self.root_layout.floats: + try: + self.layout.focus(focused_before) + except Exception: + self.layout.focus(self.center_frame) return result @@ -865,30 +830,22 @@ def data_display_dialog(self, **params: Any) -> None: body = HSplit(data_display_widgets, style='class:jans-main-datadisplay') title = params.get('title') or params['selected'][0] - def do_save(dialog): + def do_save(path): try: - with open(dialog.body.text, 'w') as w: + with open(path, 'w') as w: w.write(text_area.text) - self.pbar_text = _("File {} was saved".format(dialog.body.text)) - self.show_message(_("Info"), _("File {} was successfully saved").format(dialog.body.text), tobefocused=self.center_container) + self.pbar_text = _("File {} was saved".format(text_area.text)) + self.show_message(_("Info"), _("File {} was successfully saved").format(path), tobefocused=self.center_container) except Exception as e: self.show_message(_("Error!"), _("An error ocurred while saving") + ":\n{}".format(str(e)), tobefocused=self.center_container) - - def save(dialog): - dialog.future.set_result('Save') - path_textarea = TextArea(style=cli_style.edit_text) - do_save_button = Button(_("OK"), handler=do_save) - buttons = [Button('Cancel'), do_save_button] - save_dialog = JansGDialog(self, title=_("Enter path of file to save"), body=path_textarea, buttons=buttons) - self.show_jans_dialog(save_dialog) - - - save_button = Button(_("Save"), handler=save) + def save(dialog): + file_browser_dialog = jans_file_browser_dialog(self, path=self.browse_path, browse_type=BrowseType.save_as, ok_handler=do_save) + self.show_jans_dialog(file_browser_dialog) + save_button = Button(_("Export"), handler=save) buttons = [Button('Close'), save_button] dialog = JansGDialog(self, title=title, body=body, buttons=buttons) - self.show_jans_dialog(dialog) def save_creds(self, dialog:Dialog) -> None: diff --git a/jans-cli-tui/cli_tui/plugins/010_auth_server/agama.py b/jans-cli-tui/cli_tui/plugins/010_auth_server/agama.py new file mode 100644 index 00000000000..b097a8b3151 --- /dev/null +++ b/jans-cli-tui/cli_tui/plugins/010_auth_server/agama.py @@ -0,0 +1,170 @@ +import os +import json +import asyncio +import zipfile + +from datetime import datetime +from typing import Any + +from prompt_toolkit.application import Application +from prompt_toolkit.eventloop import get_event_loop +from prompt_toolkit.layout.dimension import D +from prompt_toolkit.layout.containers import HSplit, VSplit, DynamicContainer +from prompt_toolkit.buffer import Buffer +from prompt_toolkit.formatted_text import HTML + +from utils.multi_lang import _ +from utils.utils import DialogUtils +from utils.static import cli_style, common_strings +from wui_components.jans_vetrical_nav import JansVerticalNav +from wui_components.jans_path_browser import jans_file_browser_dialog, BrowseType + +class Agama(DialogUtils): + def __init__( + self, + app: Application + ) -> None: + + self.app = self.myparent = app + self.data = [] + self.working_container = JansVerticalNav( + myparent=app, + headers=[_("Project Name"), _("Type"), _("Author"), _("Updated")], + preferred_size= self.app.get_column_sizes(.25, .25 , .3, .1, .1), + on_display=self.app.data_display_dialog, + on_delete=self.delete_agama_project, + selectes=0, + headerColor=cli_style.navbar_headcolor, + entriesColor=cli_style.navbar_entriescolor, + hide_headers = True + ) + + self.main_container = HSplit([ + VSplit([ + self.app.getButton(text=_("Get Projects"), name='oauth:agama:get', jans_help=_("Retreive all Agama Projects"), handler=self.get_agama_projects), + self.app.getTitledText(_("Search"), name='oauth:agama:search', jans_help=_(common_strings.enter_to_search), accept_handler=self.search_agama_project, style=cli_style.edit_text), + self.app.getButton(text=_("Upload Project"), name='oauth:agama:add', jans_help=_("To add a new Agama project press this button"), handler=self.upload_project), + ], + padding=3, + width=D(), + ), + DynamicContainer(lambda: self.working_container) + ], style=cli_style.container) + + + def update_agama_container(self, start_index=0, search_str=''): + + self.working_container.clear() + data_display = [] + + for agama in self.data.get('entries', []): + if search_str.lower(): + project_str = ' '.join(( + agama['details']['projectMetadata'].get('projectName'), + agama['details']['projectMetadata'].get('author', ''), + agama['details']['projectMetadata'].get('type', ''), + agama['details']['projectMetadata'].get('description', '') + )).lower() + if search_str not in project_str: + continue + + dt_object = datetime.fromisoformat(agama['createdAt']) + + data_display.append(( + agama['details']['projectMetadata'].get('projectName'), + agama['details']['projectMetadata'].get('type', '??'), + agama['details']['projectMetadata'].get('author', '??'), + '{:02d}/{:02d}/{}'.format(dt_object.day, dt_object.month, str(dt_object.year)[2:]) + )) + + if not data_display: + self.app.show_message(_("Oops"), _(common_strings.no_matching_result), tobefocused = self.main_container) + return + + self.working_container.hide_headers = False + for datum in data_display[start_index:start_index+self.app.entries_per_page]: + self.working_container.add_item(datum) + + self.app.layout.focus(self.working_container) + + def get_agama_projects(self, search_str=''): + async def coroutine(): + cli_args = {'operation_id': 'get-agama-dev-prj'} + self.app.start_progressing(_("Retreiving agama projects...")) + response = await get_event_loop().run_in_executor(self.app.executor, self.app.cli_requests, cli_args) + self.app.stop_progressing() + try: + self.data = response.json() + except Exception: + self.app.show_message(_(common_strings.error), HTML(_("Server reterned non json data {}").format(response.text)), tobefocused=self.app.center_container) + return + + if not 'entriesCount' in self.data: + self.app.show_message(_(common_strings.error), HTML(_("Server reterned unexpected data {}").format(self.data)), tobefocused=self.app.center_container) + return + + self.working_container.all_data = self.data.get('entries', []) + self.update_agama_container(search_str=search_str) + + asyncio.ensure_future(coroutine()) + + def upload_project(self): + + def do_upload_project(path): + try: + project_zip = zipfile.ZipFile(path) + except Exception: + self.app.show_message(_(common_strings.error), HTML(_("Can't open {} as zip file.").format(path)), tobefocused=self.app.center_container) + return + + try: + project_json = json.loads(project_zip.read('project.json')) + except Exception as e: + self.app.show_message(_(common_strings.error), HTML(_("Can't read project.json from zip file:\n {}").format(str(e))), tobefocused=self.app.center_container) + return + + if 'projectName' not in project_json: + self.app.show_message(_(common_strings.error), HTML(_("Property projectName does not exist in project.json.")), tobefocused=self.app.center_container) + return + + async def coroutine(): + cli_args = {'operation_id': 'post-agama-dev-studio-prj', 'data_fn': path, 'url_suffix':'name:{}'.format(project_json['projectName'])} + self.app.start_progressing(_("Uploading agama project...")) + await get_event_loop().run_in_executor(self.app.executor, self.app.cli_requests, cli_args) + self.app.stop_progressing() + self.get_agama_projects() + + asyncio.ensure_future(coroutine()) + + + file_browser_dialog = jans_file_browser_dialog(self.app, path=self.app.browse_path, browse_type=BrowseType.file, ok_handler=do_upload_project) + self.app.show_jans_dialog(file_browser_dialog) + + def search_agama_project(self, tbuffer:Buffer) -> None: + if 'entries' in self.data: + self.update_agama_container(search_str=tbuffer.text) + else: + self.get_agama_projects(search_str=tbuffer.text) + + def delete_agama_project(self, **kwargs: Any) -> None: + agama = self.data['entries'][kwargs['selected_idx']] + project_name = agama['details']['projectMetadata']['projectName'] + + def do_delete_agama_project(result): + async def coroutine(): + cli_args = {'operation_id': 'delete-agama-dev-studio-prj', 'url_suffix': 'name:{}'.format(agama['details']['projectMetadata']['projectName'])} + self.app.start_progressing(_("Deleting agama project {}".format(project_name))) + await get_event_loop().run_in_executor(self.app.executor, self.app.cli_requests, cli_args) + self.app.stop_progressing() + self.get_agama_projects() + + asyncio.ensure_future(coroutine()) + + dialog = self.app.get_confirm_dialog( + message = HTML(_("Are you sure want to delete Agama Project {}?").format(project_name)), + confirm_handler=do_delete_agama_project + ) + + self.app.show_jans_dialog(dialog) + + diff --git a/jans-cli-tui/cli_tui/plugins/010_auth_server/main.py b/jans-cli-tui/cli_tui/plugins/010_auth_server/main.py index 641875555a1..33dae74813a 100755 --- a/jans-cli-tui/cli_tui/plugins/010_auth_server/main.py +++ b/jans-cli-tui/cli_tui/plugins/010_auth_server/main.py @@ -37,6 +37,7 @@ from edit_client_dialog import EditClientDialog from edit_scope_dialog import EditScopeDialog from ssa import SSA +from agama import Agama from prompt_toolkit.widgets import ( HorizontalLine, @@ -62,6 +63,7 @@ def __init__( self.search_text= None self.oauth_update_properties_start_index = 0 self.ssa = SSA(app) + self.agama = Agama(app) self.app_configuration = {} self.oauth_containers = {} @@ -185,6 +187,7 @@ def oauth_prepare_containers(self) -> None: ],style='class:outh_containers_scopes') self.oauth_containers['ssa'] = self.ssa.main_container + self.oauth_containers['agama'] = self.agama.main_container self.oauth_containers['logging'] = DynamicContainer(lambda: self.oauth_data_container['logging']) self.oauth_main_container = HSplit([ @@ -200,7 +203,7 @@ def oauth_prepare_navbar(self) -> None: """ self.nav_bar = JansNavBar( self.app, - entries=[('clients', 'C[l]ients'), ('scopes', 'Sc[o]pes'), ('keys', '[K]eys'), ('defaults', '[D]efaults'), ('properties', 'Properti[e]s'), ('logging', 'Lo[g]ging'), ('ssa', '[S]SA')], + entries=[('clients', 'C[l]ients'), ('scopes', 'Sc[o]pes'), ('keys', '[K]eys'), ('defaults', '[D]efaults'), ('properties', 'Properti[e]s'), ('logging', 'Lo[g]ging'), ('ssa', '[S]SA'), ('agama', 'Aga[m]a')], selection_changed=self.oauth_nav_selection_changed, select=0, jans_name='oauth:nav_bar' diff --git a/jans-cli-tui/cli_tui/utils/static.py b/jans-cli-tui/cli_tui/utils/static.py index 8bffee7f711..c4757fd8655 100755 --- a/jans-cli-tui/cli_tui/utils/static.py +++ b/jans-cli-tui/cli_tui/utils/static.py @@ -23,3 +23,4 @@ class cli_style: class common_strings: enter_to_search = "Press enter to perform search" no_matching_result = "No matching result" + error = "Error!" diff --git a/jans-cli-tui/cli_tui/wui_components/jans_cli_dialog.py b/jans-cli-tui/cli_tui/wui_components/jans_cli_dialog.py index fa4cd5f9a5a..b7cbb8384bd 100755 --- a/jans-cli-tui/cli_tui/wui_components/jans_cli_dialog.py +++ b/jans-cli-tui/cli_tui/wui_components/jans_cli_dialog.py @@ -1,4 +1,3 @@ -import json from functools import partial from asyncio import Future from prompt_toolkit.widgets import Button, Dialog @@ -14,10 +13,10 @@ class JansGDialog: def __init__( self, parent, - body: Optional[AnyContainer]=None, - title: Optional[str]= '', - buttons: Optional[Sequence[Button]]=[], - width: AnyDimension=None + body: Optional[AnyContainer] = None, + title: Optional[str] = '', + buttons: Optional[Sequence[Button]] = None, + width: AnyDimension = None )-> Dialog: """init for JansGDialog @@ -35,6 +34,7 @@ def __init__( self.future = Future() self.body = body self.myparent = parent + self.title = title if not width: width = parent.dialog_width diff --git a/jans-cli-tui/cli_tui/wui_components/jans_path_browser.py b/jans-cli-tui/cli_tui/wui_components/jans_path_browser.py new file mode 100644 index 00000000000..5c79e037be0 --- /dev/null +++ b/jans-cli-tui/cli_tui/wui_components/jans_path_browser.py @@ -0,0 +1,227 @@ +from enum import Enum +from pathlib import Path +from typing import Callable, Optional + +from prompt_toolkit.formatted_text import HTML, AnyFormattedText, merge_formatted_text +from prompt_toolkit.application import Application +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.layout import FormattedTextControl, Window +from prompt_toolkit.widgets import Label, Button, TextArea +from prompt_toolkit.layout.containers import HSplit, VSplit, ScrollOffsets, AnyContainer +from prompt_toolkit.layout.margins import ScrollbarMargin + +from utils.multi_lang import _ +from wui_components.jans_cli_dialog import JansGDialog + + +class BrowseType(Enum): + """Enumeration for path browser type""" + directory = 0 + file = 1 + save_as = 2 + + +class JansPathBrowserWidget: + """ + Creates a path browser container widget. + + :param path: Path to browse + :param browse_type: Type of browsing + :param height: Height of widget, default to 10 + """ + + + def __init__(self, + path: str, + browse_type: BrowseType, + height: int = 10 + ) -> None: + """init for JansPathBrowserWidget""" + + self.path = Path(path) + self.entries = [] + self.browse_type = browse_type + self.height = height + + self.selected_line = 0 + browsed_dir = Window(FormattedTextControl(lambda: self.path.as_posix())) + path_selection_window = Window( + content=FormattedTextControl( + text=self._get_formatted_text, + focusable=True, + key_bindings=self._get_key_bindings(), + ), + scroll_offsets=ScrollOffsets(top=2, bottom=2), + right_margins=[ScrollbarMargin(display_arrows=True),], + cursorline=False, + height=self.height + ) + container_content = [VSplit([Label(_("Directory:"), width=11), browsed_dir]), path_selection_window] + focusable = False if browse_type == BrowseType.file else True + self.file_name = TextArea(multiline=False, focusable=focusable) + + if browse_type == BrowseType.file: + self.file_name.read_only = True + + if browse_type in (BrowseType.save_as, BrowseType.file): + container_content.append(VSplit([Label("File name:", width=11), self.file_name])) + + self.container = HSplit(container_content) + + + def _path_as_str(self, path:str, i:int) -> str: + """Converts name of path object to string with icon based on type""" + s = path.name + if path.is_dir(): + if not i and self.path.as_posix() != '/': + s = '..' + return chr(128448) +' ' + s + return chr(128441) + ' ' + s + + + def _get_formatted_text(self) -> AnyFormattedText: + """Returns formatted text for list""" + + self._update_path_content() + result = [] + + for i, path in enumerate(self.entries): + if i == self.selected_line: + result.append([("[SetCursorPosition]", "")]) + if path.is_dir(): + result.append(HTML(''.format('#ADD8E6', self._path_as_str(path, i)))) + else: + result.append(HTML(''.format('#ADD8E6', self._path_as_str(path, i)))) + else: + result.append(HTML('{}'.format(self._path_as_str(path, i)))) + result.append("\n") + + return merge_formatted_text(result) + + def _update_path_content(self) -> None: + """Updates entries for current path""" + + self.entries = [] + files = [] + dirs = [] + + for path_ in self.path.glob('*'): + if path_.is_dir(): + dirs.append(path_) + elif self.browse_type != BrowseType.directory: + files.append(path_) + + self.entries = sorted(dirs) + sorted(files) + + if self.path.as_posix() != '/': + self.entries.insert(0, self.path.parent) + + + def _set_file_name(self) -> None: + """Sets value of file_name entry if path under curser is file""" + if self.browse_type in (BrowseType.save_as, BrowseType.file): + self.file_name.text = self.entries[self.selected_line].name if self.entries[self.selected_line].is_file() else '' + + def _get_key_bindings(self) -> KeyBindings: + """Returns key bindings for list""" + + kb = KeyBindings() + + @kb.add("up") + def _go_up(event) -> None: + if self.selected_line < len(self.entries) and self.selected_line > 0: + self.selected_line = self.selected_line - 1 + self._set_file_name() + + @kb.add("down") + def _go_down(event) -> None: + if self.selected_line < len(self.entries) - 1: + self.selected_line = self.selected_line + 1 + self._set_file_name() + + @kb.add("enter") + def _enter(event) -> None: + selected_path = self.entries[self.selected_line] + if selected_path.is_dir(): + self.selected_line = 0 + self.path = selected_path + + @kb.add("pageup") + def _pageup(event) -> None: + for _ in range(self.height - 1): + _go_up(event) + + @kb.add("pagedown") + def _pagedown(event) -> None: + for _ in range(self.height - 1): + _go_down(event) + + return kb + + def __pt_container__(self) -> AnyContainer: + return self.container + + +def jans_file_browser_dialog( + app: Application , + path: str = '/', + browse_type: Optional[BrowseType] = BrowseType.save_as, + ok_handler: Optional[Callable] = None + ) -> JansGDialog: + """Functo to create a Jans File Browser Dialog + + Args: + app (Application): JansCliApp + browse_type (BrowseType, optional): Type of browsing + ok_handler (collable, optional): Callable when OK button is pressed + """ + + browse_widget = JansPathBrowserWidget(path, browse_type) + + + def call_ok_handler(dialog): + dialog.future.set_result(True) + if browse_widget.path.is_file(): + app.browse_path = browse_widget.path.parent.as_posix() + else: + app.browse_path = browse_widget.path.as_posix() + + if ok_handler: + if browse_type in (BrowseType.file, BrowseType.save_as): + ok_handler(browse_widget.path.joinpath(browse_widget.file_name.text).as_posix()) + else: + ok_handler(browse_widget.path.as_posix()) + + def confirm_handler(dialog): + call_ok_handler(dialog.parent_dialog) + + def my_ok_handler(dialog): + if browse_type == BrowseType.save_as: + if not browse_widget.file_name.text: + return + if browse_widget.path.joinpath(browse_widget.file_name.text).exists(): + confirm_dialog = app.get_confirm_dialog(HTML(_("A file named {} already exists. Do you want to replace it?")).format(browse_widget.file_name.text), confirm_handler=confirm_handler) + confirm_dialog.parent_dialog = dialog + app.show_jans_dialog(confirm_dialog) + return + call_ok_handler(dialog) + elif browse_type == BrowseType.file: + if not browse_widget.file_name.text: + return + call_ok_handler(dialog) + elif BrowseType.directory: + call_ok_handler(dialog) + + if browse_type == BrowseType.directory: + title = _("Select Directory") + elif browse_type == BrowseType.file: + title = _("Select File") + elif browse_type == BrowseType.save_as: + title = _("Save As") + + ok_button = Button(_("OK"), handler=my_ok_handler) + ok_button.keep_dialog = True + cancel_button = Button(_("Cancel")) + dialog = JansGDialog(app, title=title, body=browse_widget, buttons=[ok_button, cancel_button]) + + return dialog diff --git a/jans-cli-tui/cli_tui/wui_components/jans_path_finder.py.txt b/jans-cli-tui/cli_tui/wui_components/jans_path_finder.py.txt deleted file mode 100644 index 4c9d85a8b8b..00000000000 --- a/jans-cli-tui/cli_tui/wui_components/jans_path_finder.py.txt +++ /dev/null @@ -1,451 +0,0 @@ -#!/usr/bin/env python - -from prompt_toolkit.application import Application -from prompt_toolkit.application.current import get_app -from prompt_toolkit.key_binding import KeyBindings -from prompt_toolkit.key_binding.bindings.focus import focus_next, focus_previous -from prompt_toolkit.layout import ( - Float, - FloatContainer, - HSplit, - Layout, -) -from prompt_toolkit.application.current import get_app -from prompt_toolkit.key_binding import KeyBindings -from prompt_toolkit.layout.containers import Float, HSplit, Window -from prompt_toolkit.layout.controls import FormattedTextControl -from prompt_toolkit.formatted_text import HTML, merge_formatted_text -from prompt_toolkit.layout.margins import ScrollbarMargin -from prompt_toolkit.key_binding.bindings.focus import focus_next -from prompt_toolkit.layout.dimension import D -from prompt_toolkit.layout.containers import ( - Float, - HSplit, - FloatContainer, - Window, - FormattedTextControl -) -from prompt_toolkit.mouse_events import MouseEvent, MouseEventType -from prompt_toolkit.data_structures import Point - -from prompt_toolkit.application.current import get_app -from prompt_toolkit.key_binding import KeyBindings -from prompt_toolkit.layout.controls import FormattedTextControl -from prompt_toolkit.formatted_text import HTML, merge_formatted_text -from prompt_toolkit.layout.margins import ScrollbarMargin -from prompt_toolkit.key_binding.bindings.focus import focus_next, focus_previous -from prompt_toolkit.layout.dimension import D -from prompt_toolkit.widgets import Button, Label -from prompt_toolkit.layout.containers import ( - Float, - HSplit, - VSplit, - DynamicContainer, - Window -) -from typing import Optional -from prompt_toolkit.formatted_text import AnyFormattedText -from prompt_toolkit.widgets import Button, Dialog -from prompt_toolkit.key_binding.key_bindings import KeyBindings, KeyBindingsBase - -import glob - -#-------------------------------------------------------------------------------# -#-------------------------------------------------------------------------------# -#-------------------------------------------------------------------------------# - -class JansPathFinder: - """_summary_ - """ - - def __init__( - self, - )-> HSplit: - - - self.cord_y = 0 - self.cord_x = 0 - self.old_cord_x = 0 - self.selected_cord = (0, 0) - - self.start_dir = 0 - self.start_file = 0 - self.flag_y =0 - self.list_dir = [] - self.select_flag = True - - self.current_dir = '/' - self.current_file = '' - self.list_dir.append(self.current_dir) - # -----------------------------------------------------------------------------------------------# - # -----------------------------------------------------------------------------------------------# - # -----------------------------------------------------------------------------------------------# - - self.current_dir_label = Label(text=self.current_dir,width=len(self.current_dir)) - self.current_file_label = Label(text=self.current_file,width=len(self.current_file)) - - self.dire_container = Window( - content=FormattedTextControl( - text=self._get_directories, - ), - height=5, - cursorline=False, - style="fg:#ff0000 bg:#ffffff", ### days window style - right_margins=[ScrollbarMargin(display_arrows=True),], - wrap_lines=True, - ) - - self.file_container = Window( - content=FormattedTextControl( - text=self._get_files, - focusable=True, - ), - height=5, - cursorline=False, - style="fg:#0000ff bg:#ffffff", ### time window style - right_margins=[ScrollbarMargin(display_arrows=True),], - wrap_lines=True - ) - - self.current_directories = self.get_direct() - - self.current_files = self.get_files() - - self.container =HSplit(children=[ - VSplit([ - Label(text="Dir: ",width=len("Dir: ")), - Window(width=2,char=' '), - DynamicContainer(lambda: self.current_dir_label), - ],style="fg:#000000 bg:#ffffff",padding=1), ### Month and year window style - #DynamicContainer(lambda: self.depug), - - - # -----------------------------------------------------------------------------------------------# - # ----------------------------------------- Directories -----------------------------------------# - # -----------------------------------------------------------------------------------------------# - DynamicContainer(lambda: self.dire_container), - - # -----------------------------------------------------------------------------------------------# - # ----------------------------------------- Files -----------------------------------------------# - # -----------------------------------------------------------------------------------------------# - VSplit([ - Label(text="Files: ",width=len("Files: ")), - Window(width=2,char=' '), - DynamicContainer(lambda: self.current_file_label), - ],style="fg:#000000 bg:#ffffff",padding=1), ### Month and year window style - DynamicContainer(lambda: self.file_container), - - ]) - - def get_direct (self)-> list: - dir_list = [] - - self.len_directories = len(glob.glob("/{}/*/".format(self.current_dir))) - - for dir in (glob.glob("/{}/*/".format(self.current_dir))): - - dir_list.append('{}'.format(dir.replace('/{}/'.format(self.current_dir),'')).replace('//','/')) - - return dir_list - - - def get_files(self)-> list: - files_list = [] - - self.len_files = len(glob.glob("/{}/*.*".format(self.current_dir))) - - for dir in (glob.glob("/{}/*.*".format(self.current_dir))): - - files_list.append('{}'.format(dir.replace('/{}/'.format(self.current_dir),'')).replace('//','/')) - - return files_list - - - def _get_files(self)-> AnyFormattedText: - result = [] - files_list = [] - - for i, dir in enumerate(self.current_files[0+self.start_file:5+self.start_file]): - if i == self.cord_y and self.select_flag==True: - files_list.append(HTML(''.format(dir.replace('/{}/'.format(self.current_dir),'')))) - else: - files_list.append(HTML('{}'.format(dir.replace('/{}/'.format(self.current_dir),'')))) - - result= (files_list) - - result.append("\n") - - return merge_formatted_text(result) - - def _get_directories(self)-> AnyFormattedText: - result = [] - dir_list = [] - - for i, dir in enumerate(self.current_directories[0+self.start_dir:5+self.start_dir]): - if i == self.cord_y and self.select_flag == False: - dir_list.append(HTML(''.format(dir.replace('/{}/'.format(self.current_dir),'')))) - else: - dir_list.append(HTML('{}'.format(dir.replace('/{}/'.format(self.current_dir),'')))) - - result= (dir_list) - - result.append("\n") - - return merge_formatted_text(result) - - - def up(self)-> None: - - if self.select_flag == False: ## Dir - - if self.cord_y <= 0 and self.flag_y > 0 : - self.flag_y -=1 - self.start_dir -=1 - - elif self.cord_y -1 >=0: - self.cord_y -=1 - self.flag_y -=1 - self.current_dir = self.current_directories[0+self.start_dir:5+self.start_dir][self.cord_y] - - else: ## file - if self.cord_y <= 0 and self.flag_y > 0 : - self.flag_y -=1 - self.start_file -=1 - - elif self.cord_y -1 >=0: - self.cord_y -=1 - self.flag_y -=1 - - self.current_files = self.current_files[0+self.start_file:5+self.start_file] - - def down(self)-> None: - if self.select_flag == False : - if self.cord_y +1 < 5 : - self.cord_y +=1 - self.flag_y +=1 - - elif self.flag_y < len(self.current_directories) -1: - - self.flag_y +=1 - self.start_dir +=1 - - self.current_dir = self.current_directories[0+self.start_dir:5+self.start_dir][self.cord_y] - else: - if self.cord_y +1 < 5 : - self.cord_y +=1 - self.flag_y +=1 - - elif self.flag_y < len(self.current_files) -1: - - self.flag_y +=1 - self.start_file +=1 - - self.current_files = self.current_files[0+self.start_file:5+self.start_file] - - def enter(self)-> None: - # self.current_dir = self.current_directories[0+self.start_dir:5+self.start_dir][self.cord_y] - # self.current_dir_label =Label(text=self.current_dir,width=len(self.current_dir)) - - ### current files - if self.current_files: - self.current_file = self.current_files[0+self.start_file:5+self.start_file][self.cord_y] - self.current_file_label = Label(text=self.current_file,width=len(self.current_file)) - else: - self.current_file = 'No Files' - self.current_file_label = Label(text=self.current_file,width=len(self.current_file)) - - - self.start_dir = 0 - self.cord_y = 0 - - - def right(self)-> None: - self.current_dir = self.current_directories[0+self.start_dir:5+self.start_dir][self.cord_y] - self.current_dir_label =Label(text=self.current_dir,width=len(self.current_dir)) - - ### current files - if self.current_files: - self.current_file = self.current_files[0+self.start_file:5+self.start_file][self.cord_y] - self.current_file_label = Label(text=self.current_file,width=len(self.current_file)) - else: - self.current_file = 'No Files' - self.current_file_label = Label(text=self.current_file,width=len(self.current_file)) - - - self.start_dir = 0 - self.cord_y = 0 - self.flag_y = 0 - self.current_files = self.get_files() - - - if len(glob.glob("/{}/*/".format(self.current_dir))) >= 1: - self.list_dir.append(self.current_dir) - self.current_directories = self.get_direct() - - else: - pass - - - def left(self)-> None: - # self.current_dir2 = self.current_directories[0+self.start_dir:5+self.start_dir][self.cord_y] - - if len(self.list_dir) != 1: - self.start_dir = 0 - self.cord_y = 0 - self.flag_y = 0 - - - self.list_dir.remove(self.list_dir[-1]) - self.current_dir = self.list_dir[-1] - self.current_directories = self.get_direct() - self.current_files = self.get_files() - - - self.current_file = 'No Files' - self.current_file_label = Label(text=self.current_file,width=len(self.current_file)) - - self.current_dir_label =Label(text=self.current_dir,width=len(self.current_dir)) - - - def next(self)-> None: - self.select_flag = not self.select_flag - self.cord_y = 0 - self.flag_y = 0 - - - - def __pt_container__(self)-> Dialog: - return self.container - - -class PathFinderWidget: - """This is a Dape Picker widget to select exact time and date - """ - def __init__( - self, - value:str, - ) -> Window: - - if value: - self.text = value - else: - self.text = "Enter to Browse Path" - - - self.value = str(value) - - self.dropdown = True - self.window = Window( - content=FormattedTextControl( - text=self._get_text, - focusable=True, - key_bindings=self._get_key_bindings(), - ), height= 5) #D()) #5 ## large sized enties get >> (window too small) - - self.select_box = JansPathFinder() - self.select_box_float = Float(content=self.select_box, xcursor=True, ycursor=True) - - @property - def value(self): - """Getter for the value property - - Returns: - str: The selected value - """ - if self.text != "Enter to Select": - return self.text - - @value.setter - def value( - self, - value:str, - )-> None: - self._value = self.value - - - def _get_text(self)-> AnyFormattedText: - """To get The selected value - - Returns: - str: The selected value - """ - - if get_app().layout.current_window is self.window: - return HTML('> <'.format('#00FF00', self.text)) - return '> {} <'.format(self.text) - - def _get_key_bindings(self)-> KeyBindingsBase: - """All key binding for the Dialog with Navigation bar - - Returns: - KeyBindings: The method according to the binding key - """ - - kb = KeyBindings() - - def _focus_next(event): - focus_next(event) - - def _focus_pre(event): - focus_previous(event) - - @kb.add("enter") - def _enter(event) -> None: - - if self.select_box_float not in get_app().layout.container.floats: - get_app().layout.container.floats.append(self.select_box_float) - self.select_box.enter() - else: - - self.text= self.select_box.current_file - - - get_app().layout.container.floats.remove(self.select_box_float) - - @kb.add("up") - def _up(event): - if self.select_box_float in get_app().layout.container.floats: - self.select_box.up() - - @kb.add("down") - def _down(event): - if self.select_box_float in get_app().layout.container.floats: - self.select_box.down() - - @kb.add("right") - def _right(event): - if self.select_box_float in get_app().layout.container.floats: - self.select_box.right() - - @kb.add("left") - def _left(event): - if self.select_box_float in get_app().layout.container.floats: - self.select_box.left() - - @kb.add("tab") - def _tab(event): - if self.select_box_float in get_app().layout.container.floats: - self.select_box.next() - else : - _focus_next(event) - - @kb.add("s-tab") - def _tab(event): - if self.select_box_float in get_app().layout.container.floats: - self.select_box.next() - else : - _focus_pre(event) - - - @kb.add("escape") - def _escape(event): - if self.select_box_float in get_app().layout.container.floats: - app = get_app() - app.layout.container.floats.remove(self.select_box_float) - return kb - - def __pt_container__(self)-> Window: - return self.window - - -