diff --git a/mathicsscript/__main__.py b/mathicsscript/__main__.py
index b88e6b2..1ba232a 100755
--- a/mathicsscript/__main__.py
+++ b/mathicsscript/__main__.py
@@ -24,13 +24,12 @@
from pygments import highlight
from mathicsscript.asymptote import asymptote_version
+from mathicsscript.interrupt import setup_signal_handler
from mathicsscript.settings import definitions
from mathicsscript.termshell import ShellEscapeException, mma_lexer
from mathicsscript.termshell_gnu import TerminalShellGNUReadline
-from mathicsscript.termshell_prompt import (
- TerminalShellCommon,
- TerminalShellPromptToolKit,
-)
+from mathicsscript.termshell import TerminalShellCommon
+from mathicsscript.termshell_prompt import TerminalShellPromptToolKit
from mathicsscript.version import __version__
try:
@@ -123,7 +122,7 @@ def load_settings(shell):
continue
evaluation.evaluate(query)
except KeyboardInterrupt:
- print("\nKeyboardInterrupt")
+ shell.errmsg("\nKeyboardInterrupt")
return True
@@ -145,16 +144,17 @@ def interactive_eval_loop(
shell: TerminalShellCommon,
unicode,
prompt,
- matplotlib: bool,
- asymptote: bool,
strict_wl_output: bool,
):
+ setup_signal_handler()
+
def identity(x: Any) -> Any:
return x
def fmt_fun(query: Any) -> Any:
return highlight(str(query), mma_lexer, shell.terminal_formatter)
+ shell.fmt_fn = fmt_fun
while True:
try:
if have_readline and shell.using_readline:
@@ -173,6 +173,11 @@ def fmt_fun(query: Any) -> Any:
fmt = fmt_fun
evaluation = Evaluation(shell.definitions, output=TerminalOutput(shell))
+
+ # Store shell into the evaluation so that an interrupt handler
+ # has access to this
+ evaluation.shell = shell
+
query, source_code = evaluation.parse_feeder_returning_code(shell)
if mathics_core.PRE_EVALUATION_HOOK is not None:
mathics_core.PRE_EVALUATION_HOOK(query, evaluation)
@@ -214,7 +219,7 @@ def fmt_fun(query: Any) -> Any:
try:
print(open(source_code[2:], "r").read())
except Exception:
- print(str(sys.exc_info()[1]))
+ shell.errmsg(str(sys.exc_info()[1]))
else:
subprocess.run(source_code[1:], shell=True)
@@ -224,13 +229,13 @@ def fmt_fun(query: Any) -> Any:
# shell.definitions.increment_line(1)
except KeyboardInterrupt:
- print("\nKeyboardInterrupt")
+ shell.errmsg("\nKeyboardInterrupt")
except EOFError:
if prompt:
- print("\n\nGoodbye!\n")
+ shell.errmsg("\n\nGoodbye!\n")
break
except SystemExit:
- print("\n\nGoodbye!\n")
+ shell.errmsg("\n\nGoodbye!\n")
# raise to pass the error code on, e.g. Quit[1]
raise
finally:
@@ -526,9 +531,7 @@ def main(
)
definitions.set_line_no(0)
- interactive_eval_loop(
- shell, charset, prompt, asymptote, matplotlib, strict_wl_output
- )
+ interactive_eval_loop(shell, charset, prompt, strict_wl_output)
return exit_rc
diff --git a/mathicsscript/completion.py b/mathicsscript/completion.py
index c70cc4a..eb9d439 100644
--- a/mathicsscript/completion.py
+++ b/mathicsscript/completion.py
@@ -23,7 +23,12 @@
from mathics.core.symbols import strip_context
from mathics_scanner import named_characters
from mathics_pygments.lexer import Regex
-from prompt_toolkit.completion import CompleteEvent, Completion, WordCompleter
+from prompt_toolkit.completion import (
+ CompleteEvent,
+ Completer,
+ Completion,
+ WordCompleter,
+)
from prompt_toolkit.document import Document
SYMBOLS = rf"[`]?({Regex.IDENTIFIER}|{Regex.NAMED_CHARACTER})(`({Regex.IDENTIFIER}|{Regex.NAMED_CHARACTER}))+[`]?"
@@ -54,6 +59,28 @@ def get_datadir():
return osp.realpath(datadir)
+class InterruptCompleter(Completer):
+ """
+ Completer for the simple command set: 'continue', 'abort', 'exit', 'show'.
+ """
+
+ COMMANDS = [
+ "abort",
+ "continue",
+ "exit",
+ "inspect",
+ "show",
+ ]
+
+ def get_completions(
+ self, document: Document, complete_event
+ ) -> Iterable[Completion]:
+ word = document.get_word_before_cursor()
+ for cmd in self.COMMANDS:
+ if cmd.startswith(word):
+ yield Completion(cmd, -len(word))
+
+
class MathicsCompleter(WordCompleter):
def __init__(self, definitions):
self.definitions = definitions
diff --git a/mathicsscript/format.py b/mathicsscript/format.py
index 56dc854..5d6af4b 100644
--- a/mathicsscript/format.py
+++ b/mathicsscript/format.py
@@ -11,6 +11,7 @@
from mathics.core.atoms import String
from mathics.core.symbols import Symbol
from mathics.core.systemsymbols import (
+ SymbolAborted,
SymbolExport,
SymbolExportString,
SymbolFullForm,
@@ -21,6 +22,7 @@
SymbolOutputForm,
SymbolPlot,
SymbolStandardForm,
+ SymbolStringForm,
SymbolTeXForm,
)
from mathics.session import get_settings_value
@@ -61,7 +63,7 @@
def format_output(obj, expr, format=None):
"""
- Handle unformatted output using the *specific* capabilities of mathics-django.
+ Handle unformatted output using the *specific* capabilities of mathicsscript
evaluation.py format_output() from which this was derived is similar but
it can't make use of a front-ends specific capabilities.
@@ -161,7 +163,15 @@ def eval_boxes(result, fn: Callable, obj, **options):
write_asy_and_view(asy_str)
return expr_type
+ if expr is SymbolAborted:
+ obj.out = ["$Aborted"]
+ obj.last_eval = SymbolAborted
+ return "$Aborted"
if format == "text":
+ if expr_head is SymbolStringForm:
+ return expr.elements[0].value
+ elif isinstance(expr, String):
+ return expr.value
result = expr.format(obj, SymbolOutputForm)
elif format == "xml":
result = Expression(SymbolStandardForm, expr).format(obj, SymbolMathMLForm)
diff --git a/mathicsscript/interrupt.py b/mathicsscript/interrupt.py
new file mode 100644
index 0000000..7193564
--- /dev/null
+++ b/mathicsscript/interrupt.py
@@ -0,0 +1,232 @@
+"""
+mathicsscript Interrupt routines.
+
+Note: other environments may build on or use other interrupt handlers
+"""
+
+import signal
+import subprocess
+import sys
+from types import FrameType
+from typing import Callable, Optional
+
+from mathics import settings
+from mathics.core.evaluation import Evaluation
+from mathics.core.interrupt import AbortInterrupt, ReturnInterrupt, TimeoutInterrupt
+from mathics.eval.stackframe import find_Mathics3_evaluation_method, get_eval_Expression
+
+
+# See also __main__'s interactive_eval_loop
+def inspect_eval_loop(evaluation: Evaluation):
+ """
+ A read eval/loop for an Interrupt's "inspect" command.
+ """
+ shell = evaluation.shell
+ if shell is not None:
+ was_inside_interrupt = shell.is_inside_interrupt
+ shell.is_inside_interrupt = True
+ else:
+ was_inside_interrupt = False
+
+ previous_recursion_depth = evaluation.recursion_depth
+ while True:
+ try:
+ # Reset line number within an In[] line number.
+ # Note: this is not setting as, say, In[5]
+ # to back to In[1], but instead it sets the line number position *within*
+ # In[5]. The user input for "In[5]" might have several continuation lines.
+ if shell is not None and hasattr(shell, "lineno"):
+ shell.lineno = 0
+
+ query, source_code = evaluation.parse_feeder_returning_code(shell)
+ # show_echo(source_code, evaluation)
+ if len(source_code) and source_code[0] == "!" and shell is not None:
+ subprocess.run(source_code[1:], shell=True)
+ if shell.definitions is not None:
+ shell.definitions.increment_line_no(1)
+ continue
+ if query is None:
+ continue
+ result = evaluation.evaluate(query, timeout=settings.TIMEOUT)
+ if result is not None and shell is not None:
+ shell.print_result(result, prompt=False, strict_wl_output=True)
+ except TimeoutInterrupt:
+ print("\nTimeout occurred - ignored.")
+ pass
+ except ReturnInterrupt:
+ evaluation.last_eval = None
+ evaluation.exc_result = None
+ evaluation.message("Interrupt", "dgend")
+ raise
+ except KeyboardInterrupt:
+ print("\nKeyboardInterrupt")
+ except EOFError:
+ print()
+ raise
+ except SystemExit:
+ # raise to pass the error code on, e.g. Quit[1]
+ raise
+ finally:
+ evaluation.recursion_depth = previous_recursion_depth
+ if shell is not None:
+ shell.is_inside_interrupt = was_inside_interrupt
+
+
+def Mathics3_interrupt_handler(
+ evaluation: Optional[Evaluation], interrupted_frame: FrameType, print_fn: Callable
+):
+
+ shell = evaluation.shell
+ incolors = shell.incolors
+ is_gnu_readline = False
+ if hasattr(shell, "bottom_toolbar"):
+ from mathicsscript.completion import InterruptCompleter
+
+ is_prompt_toolkit = True
+ use_HTML = True
+ completer = InterruptCompleter()
+ else:
+ is_prompt_toolkit = False
+ is_gnu_readline = shell.using_readline
+ use_HTML = False
+ from readline import set_completer
+
+ set_completer(lambda text, state: shell.complete_interrupt_command(text, state))
+ completer = None
+
+ while True:
+ try:
+ prompt = (
+ "interrupt> "
+ if is_prompt_toolkit
+ else f"{incolors[0]}interrupt> {incolors[3]}"
+ )
+ user_input = shell.read_line(prompt, completer, use_HTML).strip()
+ if user_input in ("a", "abort"):
+ print_fn("aborting")
+ raise AbortInterrupt
+ elif user_input in ("continue", "c"):
+ print_fn("continuing")
+ break
+ elif user_input in ("exit", "quit"):
+ print_fn("Mathics3 exited because of an interrupt.")
+ sys.exit(3)
+ elif user_input in ("inspect", "i"):
+ print_fn("inspecting")
+ if evaluation is not None:
+ evaluation.message("Interrupt", "dgbgn")
+ inspect_eval_loop(evaluation)
+
+ elif user_input in ("show", "s"):
+ # In some cases we can better, by going back to the caller
+ # and reconstructing the actual call with arguments.
+ eval_frame = find_Mathics3_evaluation_method(interrupted_frame)
+ if eval_frame is None:
+ continue
+ eval_method_name = eval_frame.f_code.co_name
+ eval_method = getattr(eval_frame.f_locals.get("self"), eval_method_name)
+ if eval_method:
+ print_fn(eval_method.__doc__)
+ eval_expression = get_eval_Expression()
+ if eval_expression is not None:
+ print_fn(shell.fmt_fn(eval_expression))
+ break
+ elif user_input in ("trace", "t"):
+ print_fn("tracing")
+ else:
+ print_fn(
+ """Your options are:
+ abort (or a) to abort current calculation
+ continue (or c) to continue
+ exit (or quit) to exit Mathics3
+ inspect (or i) to enter an interactive dialog
+ show (or s) to show current operation (and then continue)
+"""
+ )
+ except KeyboardInterrupt:
+ print_fn("\nKeyboardInterrupt")
+ except EOFError:
+ print_fn("")
+ break
+ except TimeoutInterrupt:
+ # There might have been a Pause[] set before we entered
+ # this handler. If that happens, we can clear the
+ # error. Ideally the interrupt REPL would would have clear
+ # all timeout signals, but Python doesn't support that, as
+ # far as I know.
+ #
+ # Here, we note we have time'd out. This also silences
+ # other handlers that we've handled this.
+ if evaluation is not None:
+ evaluation.timeout = True
+ break
+ except ReturnInterrupt:
+ # the interrupt shell probably isssued a Return[].
+ # Respect that.
+ break
+ except RuntimeError:
+ break
+ finally:
+ if is_gnu_readline:
+ from readline import set_completer
+
+ set_completer(
+ lambda text, state: shell.complete_symbol_name(text, state)
+ )
+ pass
+
+
+def Mathics3_basic_signal_handler(sig: int, interrupted_frame: Optional[FrameType]):
+ """
+ Custom signal handler for SIGINT (Ctrl+C).
+ """
+ evaluation: Optional[Evaluation] = None
+ # Find an evaluation object to pass to the Mathics3 interrupt handler
+ while interrupted_frame is not None:
+ if (
+ evaluation := interrupted_frame.f_locals.get("evaluation")
+ ) is not None and isinstance(evaluation, Evaluation):
+ break
+ interrupted_frame = interrupted_frame.f_back
+ print_fn = evaluation.print_out if evaluation is not None else print
+ print_fn("")
+ if interrupted_frame is None:
+ print("Unable to find Evaluation frame to start on")
+ Mathics3_interrupt_handler(evaluation, interrupted_frame, print_fn)
+
+
+def Mathics3_USR1_signal_handler(sig: int, interrupted_frame: Optional[FrameType]):
+ """
+ Custom signal handler for SIGUSR1. When we get this signal, try to
+ find an Expression that is getting evaluated, and print that. Then
+ continue.
+ """
+ shell = None
+ print_fn = print
+ while interrupted_frame is not None:
+ if (
+ evaluation := interrupted_frame.f_locals.get("evaluation")
+ ) is not None and isinstance(evaluation, Evaluation):
+ print_fn = evaluation.shell.errmsg
+ break
+ interrupted_frame = interrupted_frame.f_back
+
+ print_fn(f"USR1 ({sig}) interrupt")
+ if (eval_expression := get_eval_Expression()) is not None:
+ # If eval string is long, take just the first 100 characters
+ # of it.
+ if shell is not None:
+ eval_expression_str = shell.fmt_fn(eval_expression)
+ else:
+ eval_expression_str = str(eval_expression)
+
+ if len(eval_expression_str) > 100:
+ eval_expression_str = eval_expression_str[:100] + "..."
+
+ print(f"Expression: {eval_expression_str}")
+
+
+def setup_signal_handler():
+ signal.signal(signal.SIGINT, Mathics3_basic_signal_handler)
+ if hasattr(signal, "SIGUSR1"):
+ signal.signal(signal.SIGUSR1, Mathics3_USR1_signal_handler)
diff --git a/mathicsscript/termshell.py b/mathicsscript/termshell.py
index e4a3f5c..f467308 100644
--- a/mathicsscript/termshell.py
+++ b/mathicsscript/termshell.py
@@ -6,7 +6,7 @@
import os.path as osp
import pathlib
import sys
-from typing import Optional
+from typing import Any, Optional, Union
import mathics_scanner.location
@@ -42,7 +42,7 @@
# Set up mathicsscript configuration directory
CONFIGHOME = os.environ.get("XDG_CONFIG_HOME", osp.expanduser("~/.config"))
-CONFIGDIR = osp.join(CONFIGHOME, "mathicsscript")
+CONFIGDIR = osp.join(CONFIGHOME, "Mathics3")
os.makedirs(CONFIGDIR, exist_ok=True)
try:
@@ -50,8 +50,8 @@
except Exception:
HISTSIZE = 50
-HISTFILE = os.environ.get("MATHICS_HISTFILE", osp.join(CONFIGDIR, "history"))
-USER_INPUTRC = os.environ.get("MATHICS_INPUTRC", osp.join(CONFIGDIR, "inputrc"))
+HISTFILE = os.environ.get("MATHICS3_HISTFILE", osp.join(CONFIGDIR, "history"))
+USER_INPUTRC = os.environ.get("MATHICS3_INPUTRC", osp.join(CONFIGDIR, "inputrc"))
# Create HISTFILE if it doesn't exist already
if not osp.isfile(HISTFILE):
@@ -86,6 +86,11 @@ def __init__(
):
super().__init__([], ContainerKind.STREAM)
self.input_encoding = locale.getpreferredencoding()
+
+ # is_inside_interrupt is set True when shell has been
+ # interrupted via an interrupt handler.
+ self.is_inside_interrupt = False
+
self.lineno = 0
self.terminal_formatter = None
self.prompt = prompt
@@ -159,11 +164,36 @@ def change_pygments_style(self, style: str):
print("Pygments style not changed")
return False
- def get_last_line_number(self):
- return self.definitions.get_line_no()
+ def empty(self):
+ return False
+
+ def errmsg(self, message: str):
+ print(f"{self.outcolors[0]}{message}{self.outcolors[3]}")
+ return
- def get_in_prompt(self):
- next_line_number = self.get_last_line_number() + 1
+ def feed(self):
+ prompt_str = self.in_prompt if self.prompt else ""
+ result = self.read_line(prompt_str) + "\n"
+ if mathics_scanner.location.TRACK_LOCATIONS and self.source_text is not None:
+ self.container.append(self.source_text)
+ if result == "\n":
+ return "" # end of input
+ self.lineno += 1
+ return result
+
+ # prompt-toolkit returns a HTML object. Therefore, we include Any
+ # to cover that.
+ def get_out_prompt(self, form: str) -> Union[str, Any]:
+ """
+ Return a formatted "Out" string prefix. ``form`` is either the empty string if the
+ default form, or the name of the Form which was used in output preceded by "//"
+ """
+ line_number = self.last_line_number
+ return "{2}Out[{3}{0}{4}]{5}{1}= ".format(line_number, form, *self.outcolors)
+
+ @property
+ def in_prompt(self) -> Union[str, Any]:
+ next_line_number = self.last_line_number + 1
if self.lineno > 0:
return " " * len(f"In[{next_line_number}]:= ")
else:
@@ -173,26 +203,17 @@ def get_in_prompt(self):
# else:
# return f"In[{next_line_number}]:= "
- def get_out_prompt(self, form: str) -> str:
+ @property
+ def last_line_number(self) -> int:
"""
- Return a formatted "Out" string prefix. ``form`` is either the empty string if the
- default form, or the name of the Form which was used in output preceded by "//"
+ Return the next Out[] line number
"""
- line_number = self.get_last_line_number()
- return "{2}Out[{3}{0}{4}]{5}{1}= ".format(line_number, form, *self.outcolors)
-
- def to_output(self, text: str, form: str) -> str:
- """
- Format an 'Out=' line that it lines after the first one indent properly.
- """
- line_number = self.get_last_line_number()
- newline = "\n" + " " * len(f"Out[{line_number}]{form}= ")
- return newline.join(text.splitlines())
+ return self.definitions.get_line_no()
def out_callback(self, out):
print(self.to_output(str(out), form=""))
- def read_line(self, prompt):
+ def read_line(self, prompt, completer=None, use_html=None):
if self.using_readline:
line = self.rl_read_line(prompt)
else:
@@ -275,15 +296,10 @@ def rl_read_line(self, prompt):
def reset_lineno(self):
self.lineno = 0
- def feed(self):
- prompt_str = self.get_in_prompt() if self.prompt else ""
- result = self.read_line(prompt_str) + "\n"
- if mathics_scanner.location.TRACK_LOCATIONS and self.source_text is not None:
- self.container.append(self.source_text)
- if result == "\n":
- return "" # end of input
- self.lineno += 1
- return result
-
- def empty(self):
- return False
+ def to_output(self, text: str, form: str) -> str:
+ """
+ Format an 'Out=' line that it lines after the first one indent properly.
+ """
+ line_number = self.last_line_number
+ newline = "\n" + " " * len(f"Out[{line_number}]{form}= ")
+ return newline.join(text.splitlines())
diff --git a/mathicsscript/termshell_gnu.py b/mathicsscript/termshell_gnu.py
index b3f0624..8eb9511 100644
--- a/mathicsscript/termshell_gnu.py
+++ b/mathicsscript/termshell_gnu.py
@@ -39,6 +39,16 @@ def read_init_file(_: str):
except ImportError:
have_full_readline = False
+ def null_fn(*_):
+ return
+
+ def write_history_file(_: str):
+ return
+
+ parse_and_bind = read_history_file = set_history_length = null_fn
+ set_completer = set_completer_delims = null_fn
+
+
RL_COMPLETER_DELIMS_WITH_BRACE = " \t\n_~!@#%^&*()-=+{]}|;:'\",<>/?"
RL_COMPLETER_DELIMS = " \t\n_~!@#%^&*()-=+[{]}\\|;:'\",<>/?"
@@ -93,6 +103,10 @@ def __init__(
# History
try:
read_history_file(HISTFILE)
+ except FileNotFoundError:
+ # Create an empty history file.
+ with open(HISTFILE, "w"):
+ pass
except IOError:
pass
except: # noqa
@@ -102,6 +116,18 @@ def __init__(
set_history_length(self.history_length)
atexit.register(self.user_write_history_file)
+ def complete_interrupt_command(self, text, state):
+ # Only complete from this fixed set
+ completions = [
+ w
+ for w in ["abort", "continue", "exit", "inspect", "show"]
+ if w.startswith(text)
+ ]
+ try:
+ return completions[state]
+ except IndexError:
+ return None
+
def complete_symbol_name(self, text, state):
try:
match = re.match(r"^(.*\\\[)([A-Z][a-z]*)$", text)
diff --git a/mathicsscript/termshell_prompt.py b/mathicsscript/termshell_prompt.py
index 036ace6..f35925a 100644
--- a/mathicsscript/termshell_prompt.py
+++ b/mathicsscript/termshell_prompt.py
@@ -6,7 +6,7 @@
import os.path as osp
import re
import sys
-from typing import Optional
+from typing import Optional, Union
from colorama import init as colorama_init
from mathics.core.atoms import String
@@ -69,6 +69,11 @@ def __init__(
):
super(TerminalShellCommon, self).__init__([], ContainerKind.STREAM)
self.input_encoding = locale.getpreferredencoding()
+
+ # is_inside_interrupt is set True when shell has been
+ # interrupted via an interrupt handler.
+ self.is_inside_interrupt = False
+
self.lineno = 0
self.terminal_formatter = None
self.mma_pygments_lexer = PygmentsLexer(MathematicaLexer)
@@ -208,22 +213,26 @@ def bottom_toolbar(self):
f" mathicsscript: {__version__}, Style: {app.pygments_style}, Mode: {edit_mode}, Autobrace: {app.group_autocomplete}"
)
- def get_in_prompt(self):
- next_line_number = self.get_last_line_number() + 1
- if self.lineno > 0:
- return " " * len(f"In[{next_line_number}]:= ")
- else:
- return HTML(f"In[{next_line_number}]:= ")
+ def errmsg(self, message: str):
+ print_formatted_text(HTML(f"{message}"))
- def get_out_prompt(self, form: str) -> str:
+ def get_out_prompt(self, form: str) -> Union[str, HTML]:
"""
Return a formatted "Out" string prefix. ``form`` is either the empty string if the
default form, or the name of the Form which was used in output if it wasn't then
default form.
"""
- line_number = self.get_last_line_number()
+ line_number = self.last_line_number
return HTML(f"Out[{line_number}]{form}= ")
+ @property
+ def in_prompt(self) -> Union[str, HTML]:
+ next_line_number = self.last_line_number + 1
+ if self.lineno > 0:
+ return " " * len(f"In[{next_line_number}]:= ")
+ else:
+ return HTML(f"In[{next_line_number}]:= ")
+
def print_result(
self, result, prompt: bool, output_style="", strict_wl_output=False
):
@@ -287,15 +296,20 @@ def print_result(
else:
print(self.get_out_prompt() + output + "\n")
- def read_line(self, prompt):
+ def read_line(self, prompt, completer=None, use_html: bool = False):
# FIXME set and update inside self.
style = style_from_pygments_cls(get_style_by_name(self.pygments_style))
+ if completer is None:
+ completer = self.completer
+
+ if use_html:
+ prompt = HTML(prompt)
line = self.session.prompt(
prompt,
bottom_toolbar=self.bottom_toolbar,
- completer=self.completer,
+ completer=completer,
key_bindings=bindings,
lexer=self.mma_pygments_lexer,
style=style,