diff --git a/CHANGELOG.md b/CHANGELOG.md index 85bca453a0..a49d6a75a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added +- Added support for in-band terminal resize protocol https://github.com/Textualize/textual/pull/5217 + +### Changed + +- `Driver.process_event` is now `Driver.process_message` https://github.com/Textualize/textual/pull/5217 +- `Driver.send_event` is now `Driver.send_message` https://github.com/Textualize/textual/pull/5217 - Added `can_focus` and `can_focus_children` parameters to scrollable container types. https://github.com/Textualize/textual/pull/5226 - Added `textual.lazy.Reveal` https://github.com/Textualize/textual/pull/5226 - Added `Screen.action_blur` https://github.com/Textualize/textual/pull/5226 diff --git a/src/textual/_xterm_parser.py b/src/textual/_xterm_parser.py index 2b27c5eb61..0391cfcded 100644 --- a/src/textual/_xterm_parser.py +++ b/src/textual/_xterm_parser.py @@ -15,7 +15,7 @@ # When trying to determine whether the current sequence is a supported/valid # escape sequence, at which length should we give up and consider our search # to be unsuccessful? -_MAX_SEQUENCE_SEARCH_THRESHOLD = 20 +_MAX_SEQUENCE_SEARCH_THRESHOLD = 32 _re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"( None: elif sequence == BRACKETED_PASTE_END: bracketed_paste = False break + if match := _re_in_band_window_resize.fullmatch(sequence): + height, width, pixel_height, pixel_width = [ + group.partition(":")[0] for group in match.groups() + ] + resize_event = events.Resize.from_dimensions( + (int(width), int(height)), + (int(pixel_width), int(pixel_height)), + ) + on_token(resize_event) + break if not bracketed_paste: # Check cursor position report @@ -246,9 +259,14 @@ def send_escape() -> None: mode_report_match = _re_terminal_mode_response.match(sequence) if mode_report_match is not None: mode_id = mode_report_match["mode_id"] - setting_parameter = mode_report_match["setting_parameter"] - if mode_id == "2026" and int(setting_parameter) > 0: + setting_parameter = int(mode_report_match["setting_parameter"]) + if mode_id == "2026" and setting_parameter > 0: on_token(messages.TerminalSupportsSynchronizedOutput()) + elif mode_id == "2048": + in_band_event = messages.TerminalSupportInBandWindowResize.from_setting_parameter( + setting_parameter + ) + on_token(in_band_event) break if self._debug_log_file is not None: @@ -265,7 +283,7 @@ def _sequence_to_key_events(self, sequence: str) -> Iterable[events.Key]: Keys """ - if (match := _re_extended_key.match(sequence)) is not None: + if (match := _re_extended_key.fullmatch(sequence)) is not None: number, modifiers, end = match.groups() number = number or 1 if not (key := FUNCTIONAL_KEYS.get(f"{number}{end}", "")): diff --git a/src/textual/app.py b/src/textual/app.py index 33ae992f56..ee2aa45992 100644 --- a/src/textual/app.py +++ b/src/textual/app.py @@ -786,6 +786,9 @@ def __init__( self._hover_effects_timer: Timer | None = None + self._resize_event: events.Resize | None = None + """A pending resize event, sent on idle.""" + self._css_update_count: int = 0 """Incremented when CSS is invalidated.""" @@ -1685,7 +1688,7 @@ async def _press_keys(self, keys: Iterable[str]) -> None: char = key if len(key) == 1 else None key_event = events.Key(key, char) key_event.set_sender(app) - driver.send_event(key_event) + driver.send_message(key_event) await wait_for_idle(0) await app._animator.wait_until_complete() await wait_for_idle(0) @@ -3841,9 +3844,7 @@ async def _on_key(self, event: events.Key) -> None: async def _on_resize(self, event: events.Resize) -> None: event.stop() - self.screen.post_message(event) - for screen in self._background_screens: - screen.post_message(event) + self._resize_event = event async def _on_app_focus(self, event: events.AppFocus) -> None: """App has focus.""" @@ -4473,3 +4474,21 @@ def _on_delivery_failed(self, event: events.DeliveryComplete) -> None: self.notify( "Failed to save screenshot", title="Screenshot", severity="error" ) + + @on(messages.TerminalSupportInBandWindowResize) + def _on_terminal_supports_in_band_window_resize( + self, message: messages.TerminalSupportInBandWindowResize + ) -> None: + """There isn't much we can do with this information currently, so + we will just log it. + """ + self.log.debug(message) + + def _on_idle(self) -> None: + """Send app resize events on idle, so we don't do more resizing that necessary.""" + event = self._resize_event + if event is not None: + self._resize_event = None + self.screen.post_message(event) + for screen in self._background_screens: + screen.post_message(event) diff --git a/src/textual/driver.py b/src/textual/driver.py index 43880153fc..b16b982b5e 100644 --- a/src/textual/driver.py +++ b/src/textual/driver.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Literal, TextIO -from textual import events, log +from textual import events, log, messages from textual.events import MouseUp if TYPE_CHECKING: @@ -64,46 +64,46 @@ def can_suspend(self) -> bool: """Can this driver be suspended?""" return False - def send_event(self, event: events.Event) -> None: - """Send an event to the target app. + def send_message(self, message: messages.Message) -> None: + """Send a message to the target app. Args: - event: An event. + message: A message. """ asyncio.run_coroutine_threadsafe( - self._app._post_message(event), loop=self._loop + self._app._post_message(message), loop=self._loop ) - def process_event(self, event: events.Event) -> None: - """Perform additional processing on an event, prior to sending. + def process_message(self, message: messages.Message) -> None: + """Perform additional processing on a message, prior to sending. Args: - event: An event to send. + event: A message to process. """ # NOTE: This runs in a thread. # Avoid calling methods on the app. - event.set_sender(self._app) + message.set_sender(self._app) if self.cursor_origin is None: offset_x = 0 offset_y = 0 else: offset_x, offset_y = self.cursor_origin - if isinstance(event, events.MouseEvent): - event.x -= offset_x - event.y -= offset_y - event.screen_x -= offset_x - event.screen_y -= offset_y - - if isinstance(event, events.MouseDown): - if event.button: - self._down_buttons.append(event.button) - elif isinstance(event, events.MouseUp): - if event.button and event.button in self._down_buttons: - self._down_buttons.remove(event.button) - elif isinstance(event, events.MouseMove): + if isinstance(message, events.MouseEvent): + message.x -= offset_x + message.y -= offset_y + message.screen_x -= offset_x + message.screen_y -= offset_y + + if isinstance(message, events.MouseDown): + if message.button: + self._down_buttons.append(message.button) + elif isinstance(message, events.MouseUp): + if message.button and message.button in self._down_buttons: + self._down_buttons.remove(message.button) + elif isinstance(message, events.MouseMove): if ( self._down_buttons - and not event.button + and not message.button and self._last_move_event is not None ): # Deduplicate self._down_buttons while preserving order. @@ -111,24 +111,24 @@ def process_event(self, event: events.Event) -> None: self._down_buttons.clear() move_event = self._last_move_event for button in buttons: - self.send_event( + self.send_message( MouseUp( x=move_event.x, y=move_event.y, delta_x=0, delta_y=0, button=button, - shift=event.shift, - meta=event.meta, - ctrl=event.ctrl, + shift=message.shift, + meta=message.meta, + ctrl=message.ctrl, screen_x=move_event.screen_x, screen_y=move_event.screen_y, - style=event.style, + style=message.style, ) ) - self._last_move_event = event + self._last_move_event = message - self.send_event(event) + self.send_message(message) @abstractmethod def write(self, data: str) -> None: diff --git a/src/textual/drivers/linux_driver.py b/src/textual/drivers/linux_driver.py index 40c35f6bc7..abc011b9fd 100644 --- a/src/textual/drivers/linux_driver.py +++ b/src/textual/drivers/linux_driver.py @@ -20,6 +20,8 @@ from textual.driver import Driver from textual.drivers._writer_thread import WriterThread from textual.geometry import Size +from textual.message import Message +from textual.messages import TerminalSupportInBandWindowResize if TYPE_CHECKING: from textual.app import App @@ -59,6 +61,7 @@ def __init__( # need to know that we came in here via a SIGTSTP; this flag helps # keep track of this. self._must_signal_resume = False + self._in_band_window_resize = False # Put handlers for SIGTSTP and SIGCONT in place. These are necessary # to support the user pressing Ctrl+Z (or whatever the dev might @@ -135,6 +138,22 @@ def _enable_bracketed_paste(self) -> None: """Enable bracketed paste mode.""" self.write("\x1b[?2004h") + def _query_in_band_window_resize(self) -> None: + self.write("\x1b[?2048$p") + + def _enable_in_band_window_resize(self) -> None: + self.write("\x1b[?2048h") + + def _enable_line_wrap(self) -> None: + self.write("\x1b[?7h") + + def _disable_line_wrap(self) -> None: + self.write("\x1b[?7l") + + def _disable_in_band_window_resize(self) -> None: + if self._in_band_window_resize: + self.write("\x1b[?2048l") + def _disable_bracketed_paste(self) -> None: """Disable bracketed paste mode.""" self.write("\x1b[?2004l") @@ -197,6 +216,8 @@ def _stop_again(*_) -> None: loop = asyncio.get_running_loop() def send_size_event() -> None: + if self._in_band_window_resize: + return terminal_size = self._get_terminal_size() width, height = terminal_size textual_size = Size(width, height) @@ -253,7 +274,9 @@ def on_terminal_resize(signum, stack) -> None: send_size_event() self._key_thread.start() self._request_terminal_sync_mode_support() + self._query_in_band_window_resize() self._enable_bracketed_paste() + self._disable_line_wrap() # Appears to fix an issue enabling mouse support in iTerm 3.5.0 self._enable_mouse_support() @@ -330,6 +353,8 @@ def disable_input(self) -> None: def stop_application_mode(self) -> None: """Stop application mode, restore state.""" self._disable_bracketed_paste() + self._enable_line_wrap() + self._disable_in_band_window_resize() self.disable_input() if self.attrs_before is not None: @@ -401,9 +426,9 @@ def process_selector_events( # This can occur if the stdin is piped break for event in feed(unicode_data): - self.process_event(event) + self.process_message(event) for event in tick(): - self.process_event(event) + self.process_message(event) try: while not self.exit_event.is_set(): @@ -418,3 +443,22 @@ def process_selector_events( pass except ParseError: pass + + def process_message(self, message: Message) -> None: + # intercept in-band window resize + if isinstance(message, TerminalSupportInBandWindowResize): + # If it is supported, enabled it + if message.supported and not message.enabled: + self._enable_in_band_window_resize() + self._in_band_window_resize = message.supported + elif message.enabled: + self._in_band_window_resize = message.supported + # Send up-to-date message + super().process_message( + TerminalSupportInBandWindowResize( + message.supported, self._in_band_window_resize + ) + ) + return + + super().process_message(message) diff --git a/src/textual/drivers/linux_inline_driver.py b/src/textual/drivers/linux_inline_driver.py index 22bb349505..03e55d528f 100644 --- a/src/textual/drivers/linux_inline_driver.py +++ b/src/textual/drivers/linux_inline_driver.py @@ -155,12 +155,12 @@ def process_selector_events( if isinstance(event, events.CursorPosition): self.cursor_origin = (event.x, event.y) else: - self.process_event(event) + self.process_message(event) for event in tick(): if isinstance(event, events.CursorPosition): self.cursor_origin = (event.x, event.y) else: - self.process_event(event) + self.process_message(event) try: while not self.exit_event.is_set(): diff --git a/src/textual/drivers/web_driver.py b/src/textual/drivers/web_driver.py index 7e1e0ff0b4..2dd319f54d 100644 --- a/src/textual/drivers/web_driver.py +++ b/src/textual/drivers/web_driver.py @@ -195,12 +195,12 @@ def run_input_thread(self) -> None: if packet_type == "D": # Treat as stdin for event in parser.feed(decode(payload)): - self.process_event(event) + self.process_message(event) else: # Process meta information separately self._on_meta(packet_type, payload) for event in parser.tick(): - self.process_event(event) + self.process_message(event) except _ExitInput: pass except Exception: diff --git a/src/textual/drivers/windows_driver.py b/src/textual/drivers/windows_driver.py index 6c2f8ae392..3a908afbe7 100644 --- a/src/textual/drivers/windows_driver.py +++ b/src/textual/drivers/windows_driver.py @@ -101,7 +101,7 @@ def start_application_mode(self) -> None: self._enable_bracketed_paste() self._event_thread = win32.EventMonitor( - loop, self._app, self.exit_event, self.process_event + loop, self._app, self.exit_event, self.process_message ) self._event_thread.start() diff --git a/src/textual/events.py b/src/textual/events.py index 6711f46a93..e8fa1fa9e8 100644 --- a/src/textual/events.py +++ b/src/textual/events.py @@ -115,6 +115,7 @@ def __init__( size: Size, virtual_size: Size, container_size: Size | None = None, + pixel_size: Size | None = None, ) -> None: self.size = size """The new size of the Widget.""" @@ -122,15 +123,33 @@ def __init__( """The virtual size (scrollable size) of the Widget.""" self.container_size = size if container_size is None else container_size """The size of the Widget's container widget.""" + self.pixel_size = pixel_size + """Size of terminal window in pixels if known, or `None` if not known.""" super().__init__() + @classmethod + def from_dimensions( + cls, cells: tuple[int, int], pixels: tuple[int, int] | None + ) -> Resize: + """Construct from basic dimensions. + + Args: + cells: tuple of (, ) in cells. + pixels: tuple of (, ) in pixels if known, or `None` if not known. + + """ + size = Size(*cells) + pixel_size = Size(*pixels) if pixels is not None else None + return Resize(size, size, size, pixel_size) + def can_replace(self, message: "Message") -> bool: return isinstance(message, Resize) def __rich_repr__(self) -> rich.repr.Result: yield "size", self.size - yield "virtual_size", self.virtual_size + yield "virtual_size", self.virtual_size, self.size yield "container_size", self.container_size, self.size + yield "pixel_size", self.pixel_size, None class Compose(Event, bubble=False, verbose=True): diff --git a/src/textual/messages.py b/src/textual/messages.py index 3041b98675..427d8597c8 100644 --- a/src/textual/messages.py +++ b/src/textual/messages.py @@ -97,3 +97,42 @@ class TerminalSupportsSynchronizedOutput(Message): Used to make the App aware that the terminal emulator supports synchronised output. @link https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036 """ + + +@rich.repr.auto +class TerminalSupportInBandWindowResize(Message): + """Reports if the in-band window resize protocol is supported. + + https://gist.github.com/rockorager/e695fb2924d36b2bcf1fff4a3704bd83""" + + def __init__(self, supported: bool, enabled: bool) -> None: + """Initialize message. + + Args: + supported: Is the protocol supported? + enabled: Is the protocol enabled. + """ + self.supported = supported + self.enabled = enabled + super().__init__() + + def __rich_repr__(self) -> rich.repr.Result: + yield "supported", self.supported + yield "enabled", self.enabled + + @classmethod + def from_setting_parameter( + cls, setting_parameter: int + ) -> TerminalSupportInBandWindowResize: + """Construct the message from the setting parameter. + + Args: + setting_parameter: Setting parameter from stdin. + + Returns: + New TerminalSupportInBandWindowResize instance. + """ + + supported = setting_parameter not in (0, 4) + enabled = setting_parameter in (1, 3) + return TerminalSupportInBandWindowResize(supported, enabled) diff --git a/tests/test_driver.py b/tests/test_driver.py index e3b5feba81..0c37f6c86f 100644 --- a/tests/test_driver.py +++ b/tests/test_driver.py @@ -18,8 +18,8 @@ def handle(self, event): app = MyApp() async with app.run_test() as pilot: - app._driver.process_event(MouseDown(0, 0, 0, 0, 1, False, False, False)) - app._driver.process_event(MouseUp(0, 0, 0, 0, 1, False, False, False)) + app._driver.process_message(MouseDown(0, 0, 0, 0, 1, False, False, False)) + app._driver.process_message(MouseUp(0, 0, 0, 0, 1, False, False, False)) await pilot.pause() assert len(app.messages) == 3 assert isinstance(app.messages[0], MouseDown) @@ -41,8 +41,8 @@ def on_button_pressed(self, event): app = MyApp() async with app.run_test() as pilot: - app._driver.process_event(MouseDown(0, 0, 0, 0, 1, False, False, False)) - app._driver.process_event(MouseUp(0, 0, 0, 0, 1, False, False, False)) + app._driver.process_message(MouseDown(0, 0, 0, 0, 1, False, False, False)) + app._driver.process_message(MouseUp(0, 0, 0, 0, 1, False, False, False)) await pilot.pause() assert len(app.messages) == 1 @@ -69,8 +69,8 @@ def on_button_pressed(self, event): assert (width, height) == (button_width, button_height) # Mouse down on the button, then move the mouse inside the button, then mouse up. - app._driver.process_event(MouseDown(0, 0, 0, 0, 1, False, False, False)) - app._driver.process_event( + app._driver.process_message(MouseDown(0, 0, 0, 0, 1, False, False, False)) + app._driver.process_message( MouseUp( button_width - 1, button_height - 1, @@ -108,8 +108,8 @@ def on_button_pressed(self, event): assert (width, height) == (button_width, button_height) # Mouse down on the button, then move the mouse outside the button, then mouse up. - app._driver.process_event(MouseDown(0, 0, 0, 0, 1, False, False, False)) - app._driver.process_event( + app._driver.process_message(MouseDown(0, 0, 0, 0, 1, False, False, False)) + app._driver.process_message( MouseUp( button_width + 1, button_height + 1, diff --git a/tests/test_xterm_parser.py b/tests/test_xterm_parser.py index 4e0168b921..eb441e432b 100644 --- a/tests/test_xterm_parser.py +++ b/tests/test_xterm_parser.py @@ -91,7 +91,7 @@ def test_cant_match_escape_sequence_too_long(parser): """The sequence did not match, and we hit the maximum sequence search length threshold, so each character should be issued as a key-press instead. """ - sequence = "\x1b[123456789123456789123" + sequence = "\x1b[123456789123456789123123456789123456789123" events = list(parser.feed(sequence)) # Every character in the sequence is converted to a key press