Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added

- Added support for in-band terminal resize protocol https://github.com/Textualize/textual/pull/5217

### Changed

- `Driver.process_event` is now `Driver.process_message` https://github.com/Textualize/textual/pull/5217
- `Driver.send_event` is now `Driver.send_message` https://github.com/Textualize/textual/pull/5217
- Added `can_focus` and `can_focus_children` parameters to scrollable container types. https://github.com/Textualize/textual/pull/5226
- Added `textual.lazy.Reveal` https://github.com/Textualize/textual/pull/5226
- Added `Screen.action_blur` https://github.com/Textualize/textual/pull/5226
Expand Down
26 changes: 22 additions & 4 deletions src/textual/_xterm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# When trying to determine whether the current sequence is a supported/valid
# escape sequence, at which length should we give up and consider our search
# to be unsuccessful?
_MAX_SEQUENCE_SEARCH_THRESHOLD = 20
_MAX_SEQUENCE_SEARCH_THRESHOLD = 32

_re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
_re_terminal_mode_response = re.compile(
Expand All @@ -37,6 +37,9 @@
"""Set of special sequences."""

_re_extended_key: Final = re.compile(r"\x1b\[(?:(\d+)(?:;(\d+))?)?([u~ABCDEFHPQRS])")
_re_in_band_window_resize: Final = re.compile(
r"\x1b\[48;(\d+(?:\:.*?)?);(\d+(?:\:.*?)?);(\d+(?:\:.*?)?);(\d+(?:\:.*?)?)t"
)


class XTermParser(Parser[Message]):
Expand Down Expand Up @@ -212,6 +215,16 @@ def send_escape() -> None:
elif sequence == BRACKETED_PASTE_END:
bracketed_paste = False
break
if match := _re_in_band_window_resize.fullmatch(sequence):
height, width, pixel_height, pixel_width = [
group.partition(":")[0] for group in match.groups()
]
resize_event = events.Resize.from_dimensions(
(int(width), int(height)),
(int(pixel_width), int(pixel_height)),
)
on_token(resize_event)
break

if not bracketed_paste:
# Check cursor position report
Expand Down Expand Up @@ -246,9 +259,14 @@ def send_escape() -> None:
mode_report_match = _re_terminal_mode_response.match(sequence)
if mode_report_match is not None:
mode_id = mode_report_match["mode_id"]
setting_parameter = mode_report_match["setting_parameter"]
if mode_id == "2026" and int(setting_parameter) > 0:
setting_parameter = int(mode_report_match["setting_parameter"])
if mode_id == "2026" and setting_parameter > 0:
on_token(messages.TerminalSupportsSynchronizedOutput())
elif mode_id == "2048":
in_band_event = messages.TerminalSupportInBandWindowResize.from_setting_parameter(
setting_parameter
)
on_token(in_band_event)
break

if self._debug_log_file is not None:
Expand All @@ -265,7 +283,7 @@ def _sequence_to_key_events(self, sequence: str) -> Iterable[events.Key]:
Keys
"""

if (match := _re_extended_key.match(sequence)) is not None:
if (match := _re_extended_key.fullmatch(sequence)) is not None:
number, modifiers, end = match.groups()
number = number or 1
if not (key := FUNCTIONAL_KEYS.get(f"{number}{end}", "")):
Expand Down
27 changes: 23 additions & 4 deletions src/textual/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,9 @@ def __init__(

self._hover_effects_timer: Timer | None = None

self._resize_event: events.Resize | None = None
"""A pending resize event, sent on idle."""

self._css_update_count: int = 0
"""Incremented when CSS is invalidated."""

Expand Down Expand Up @@ -1685,7 +1688,7 @@ async def _press_keys(self, keys: Iterable[str]) -> None:
char = key if len(key) == 1 else None
key_event = events.Key(key, char)
key_event.set_sender(app)
driver.send_event(key_event)
driver.send_message(key_event)
await wait_for_idle(0)
await app._animator.wait_until_complete()
await wait_for_idle(0)
Expand Down Expand Up @@ -3841,9 +3844,7 @@ async def _on_key(self, event: events.Key) -> None:

async def _on_resize(self, event: events.Resize) -> None:
event.stop()
self.screen.post_message(event)
for screen in self._background_screens:
screen.post_message(event)
self._resize_event = event

async def _on_app_focus(self, event: events.AppFocus) -> None:
"""App has focus."""
Expand Down Expand Up @@ -4473,3 +4474,21 @@ def _on_delivery_failed(self, event: events.DeliveryComplete) -> None:
self.notify(
"Failed to save screenshot", title="Screenshot", severity="error"
)

@on(messages.TerminalSupportInBandWindowResize)
def _on_terminal_supports_in_band_window_resize(
self, message: messages.TerminalSupportInBandWindowResize
) -> None:
"""There isn't much we can do with this information currently, so
we will just log it.
"""
self.log.debug(message)

def _on_idle(self) -> None:
"""Send app resize events on idle, so we don't do more resizing that necessary."""
event = self._resize_event
if event is not None:
self._resize_event = None
self.screen.post_message(event)
for screen in self._background_screens:
screen.post_message(event)
60 changes: 30 additions & 30 deletions src/textual/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Literal, TextIO

from textual import events, log
from textual import events, log, messages
from textual.events import MouseUp

if TYPE_CHECKING:
Expand Down Expand Up @@ -64,71 +64,71 @@ def can_suspend(self) -> bool:
"""Can this driver be suspended?"""
return False

def send_event(self, event: events.Event) -> None:
"""Send an event to the target app.
def send_message(self, message: messages.Message) -> None:
"""Send a message to the target app.

Args:
event: An event.
message: A message.
"""
asyncio.run_coroutine_threadsafe(
self._app._post_message(event), loop=self._loop
self._app._post_message(message), loop=self._loop
)

def process_event(self, event: events.Event) -> None:
"""Perform additional processing on an event, prior to sending.
def process_message(self, message: messages.Message) -> None:
"""Perform additional processing on a message, prior to sending.

Args:
event: An event to send.
event: A message to process.
"""
# NOTE: This runs in a thread.
# Avoid calling methods on the app.
event.set_sender(self._app)
message.set_sender(self._app)
if self.cursor_origin is None:
offset_x = 0
offset_y = 0
else:
offset_x, offset_y = self.cursor_origin
if isinstance(event, events.MouseEvent):
event.x -= offset_x
event.y -= offset_y
event.screen_x -= offset_x
event.screen_y -= offset_y

if isinstance(event, events.MouseDown):
if event.button:
self._down_buttons.append(event.button)
elif isinstance(event, events.MouseUp):
if event.button and event.button in self._down_buttons:
self._down_buttons.remove(event.button)
elif isinstance(event, events.MouseMove):
if isinstance(message, events.MouseEvent):
message.x -= offset_x
message.y -= offset_y
message.screen_x -= offset_x
message.screen_y -= offset_y

if isinstance(message, events.MouseDown):
if message.button:
self._down_buttons.append(message.button)
elif isinstance(message, events.MouseUp):
if message.button and message.button in self._down_buttons:
self._down_buttons.remove(message.button)
elif isinstance(message, events.MouseMove):
if (
self._down_buttons
and not event.button
and not message.button
and self._last_move_event is not None
):
# Deduplicate self._down_buttons while preserving order.
buttons = list(dict.fromkeys(self._down_buttons).keys())
self._down_buttons.clear()
move_event = self._last_move_event
for button in buttons:
self.send_event(
self.send_message(
MouseUp(
x=move_event.x,
y=move_event.y,
delta_x=0,
delta_y=0,
button=button,
shift=event.shift,
meta=event.meta,
ctrl=event.ctrl,
shift=message.shift,
meta=message.meta,
ctrl=message.ctrl,
screen_x=move_event.screen_x,
screen_y=move_event.screen_y,
style=event.style,
style=message.style,
)
)
self._last_move_event = event
self._last_move_event = message

self.send_event(event)
self.send_message(message)

@abstractmethod
def write(self, data: str) -> None:
Expand Down
48 changes: 46 additions & 2 deletions src/textual/drivers/linux_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from textual.driver import Driver
from textual.drivers._writer_thread import WriterThread
from textual.geometry import Size
from textual.message import Message
from textual.messages import TerminalSupportInBandWindowResize

if TYPE_CHECKING:
from textual.app import App
Expand Down Expand Up @@ -59,6 +61,7 @@ def __init__(
# need to know that we came in here via a SIGTSTP; this flag helps
# keep track of this.
self._must_signal_resume = False
self._in_band_window_resize = False

# Put handlers for SIGTSTP and SIGCONT in place. These are necessary
# to support the user pressing Ctrl+Z (or whatever the dev might
Expand Down Expand Up @@ -135,6 +138,22 @@ def _enable_bracketed_paste(self) -> None:
"""Enable bracketed paste mode."""
self.write("\x1b[?2004h")

def _query_in_band_window_resize(self) -> None:
self.write("\x1b[?2048$p")

def _enable_in_band_window_resize(self) -> None:
self.write("\x1b[?2048h")

def _enable_line_wrap(self) -> None:
self.write("\x1b[?7h")

def _disable_line_wrap(self) -> None:
self.write("\x1b[?7l")

def _disable_in_band_window_resize(self) -> None:
if self._in_band_window_resize:
self.write("\x1b[?2048l")

def _disable_bracketed_paste(self) -> None:
"""Disable bracketed paste mode."""
self.write("\x1b[?2004l")
Expand Down Expand Up @@ -197,6 +216,8 @@ def _stop_again(*_) -> None:
loop = asyncio.get_running_loop()

def send_size_event() -> None:
if self._in_band_window_resize:
return
terminal_size = self._get_terminal_size()
width, height = terminal_size
textual_size = Size(width, height)
Expand Down Expand Up @@ -253,7 +274,9 @@ def on_terminal_resize(signum, stack) -> None:
send_size_event()
self._key_thread.start()
self._request_terminal_sync_mode_support()
self._query_in_band_window_resize()
self._enable_bracketed_paste()
self._disable_line_wrap()

# Appears to fix an issue enabling mouse support in iTerm 3.5.0
self._enable_mouse_support()
Expand Down Expand Up @@ -330,6 +353,8 @@ def disable_input(self) -> None:
def stop_application_mode(self) -> None:
"""Stop application mode, restore state."""
self._disable_bracketed_paste()
self._enable_line_wrap()
self._disable_in_band_window_resize()
self.disable_input()

if self.attrs_before is not None:
Expand Down Expand Up @@ -401,9 +426,9 @@ def process_selector_events(
# This can occur if the stdin is piped
break
for event in feed(unicode_data):
self.process_event(event)
self.process_message(event)
for event in tick():
self.process_event(event)
self.process_message(event)

try:
while not self.exit_event.is_set():
Expand All @@ -418,3 +443,22 @@ def process_selector_events(
pass
except ParseError:
pass

def process_message(self, message: Message) -> None:
# intercept in-band window resize
if isinstance(message, TerminalSupportInBandWindowResize):
# If it is supported, enabled it
if message.supported and not message.enabled:
self._enable_in_band_window_resize()
self._in_band_window_resize = message.supported
elif message.enabled:
self._in_band_window_resize = message.supported
# Send up-to-date message
super().process_message(
TerminalSupportInBandWindowResize(
message.supported, self._in_band_window_resize
)
)
return

super().process_message(message)
4 changes: 2 additions & 2 deletions src/textual/drivers/linux_inline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ def process_selector_events(
if isinstance(event, events.CursorPosition):
self.cursor_origin = (event.x, event.y)
else:
self.process_event(event)
self.process_message(event)
for event in tick():
if isinstance(event, events.CursorPosition):
self.cursor_origin = (event.x, event.y)
else:
self.process_event(event)
self.process_message(event)

try:
while not self.exit_event.is_set():
Expand Down
4 changes: 2 additions & 2 deletions src/textual/drivers/web_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ def run_input_thread(self) -> None:
if packet_type == "D":
# Treat as stdin
for event in parser.feed(decode(payload)):
self.process_event(event)
self.process_message(event)
else:
# Process meta information separately
self._on_meta(packet_type, payload)
for event in parser.tick():
self.process_event(event)
self.process_message(event)
except _ExitInput:
pass
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion src/textual/drivers/windows_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def start_application_mode(self) -> None:
self._enable_bracketed_paste()

self._event_thread = win32.EventMonitor(
loop, self._app, self.exit_event, self.process_event
loop, self._app, self.exit_event, self.process_message
)
self._event_thread.start()

Expand Down
Loading
Loading