In [None]:
#|hide
#|default_exp shell

# shell

> A shell for running notebook code without a notebook server

In [None]:
#|export
from fastcore.utils import *
from IPython.core.interactiveshell import InteractiveShell
from IPython.core.displayhook import DisplayHook
from IPython.core.displaypub import DisplayPublisher
from io import StringIO

from execnb.fastshell import FastInteractiveShell
from execnb.nbio import *

In [None]:
#|export
# IPython requires a DisplayHook and DisplayPublisher
# We override `__call__` and `publish` to save outputs instead of printing them

class _CaptureHook(DisplayHook):
    "Called when displaying a result"
    def __call__(self, result=None):
        if result is None: return
        self.fill_exec_result(result)
        self.shell._result(result)

class _CapturePub(DisplayPublisher):
    "Called when adding an output"
    def publish(self, data, metadata=None, **kwargs): self.shell._add_out(data, metadata, typ='display_data')

In [None]:
#|export
# These are the standard notebook formats for exception and stream data (e.g stdout)
def _out_exc(ename, evalue, traceback): return dict(ename=str(ename), evalue=str(evalue), output_type='error', traceback=traceback)
def _out_stream(text): return dict(name='stdout', output_type='stream', text=text.splitlines(False))

## CaptureShell -

In [None]:
#|export
class CaptureShell(FastInteractiveShell):
    "Execute the IPython/Jupyter source code"
    def __init__(self):
        super().__init__(displayhook_class=_CaptureHook, display_pub_class=_CapturePub)
        InteractiveShell._instance = self
        self.result,self.out,self.count = None,[],1
        self.run_cell('%matplotlib inline')

    def enable_gui(self, gui=None): pass

    def _showtraceback(self, etype, evalue, stb: str):
        self.out.append(_out_exc(etype, evalue, stb))
        self.exc = (etype, evalue, '\n'.join(stb))

    def _add_out(self, data, meta, typ='execute_result', **kwargs): self.out.append(dict(data=data, metadata=meta, output_type=typ, **kwargs))

    def _add_exec(self, result, meta, typ='execute_result'):
        fd = {k:v.splitlines(True) for k,v in result.items()}
        self._add_out(fd, meta, execution_count=self.count)
        self.count += 1

    def _result(self, result):
        self.result = result
        self._add_exec(*self.display_formatter.format(result))

    def _stream(self, std):
        text = std.getvalue()
        if text: self.out.append(_out_stream(text))

In [None]:
s = CaptureShell()

### Cells -

In [None]:
#|export
@patch
def run(self:CaptureShell, code:str, stdout=True, stderr=True):
    "runs `code`, returning a list of all outputs in Jupyter notebook format"
    self.exc = False
    self.out.clear()
    self.sys_stdout,self.sys_stderr = sys.stdout,sys.stderr
    if stdout: stdout = sys.stdout = StringIO()
    if stderr: stderr = sys.stderr = StringIO()
    try: self.run_cell(code)
    finally: sys.stdout,sys.stderr = self.sys_stdout,self.sys_stderr
    self._stream(stdout)
    return [*self.out]

In [None]:
s.run("print(1)")

[{'name': 'stdout', 'output_type': 'stream', 'text': ['1']}]

Code can include magics and `!` shell commands:

In [None]:
s.run("%time 1+1")

[{'data': {'text/plain': ['2']},
  'metadata': {},
  'output_type': 'execute_result',
  'execution_count': 1},
 {'name': 'stdout',
  'output_type': 'stream',
  'text': ['CPU times: user 1 us, sys: 0 ns, total: 1 us',
   'Wall time: 3.34 us']}]

The result of the last successful execution is stored in `result`:

In [None]:
s.result

2

If an exception is raised then the exception type, object, and stacktrace are stored in `exc`:

In [None]:
s.run('raise Exception("Oops")')
typ,obj,st = s.exc
typ,obj

(Exception, Exception('Oops'))

In [None]:
print(st)

[0;31m---------------------------------------------------------------------------[0m
[0;31mException[0m                                 Traceback (most recent call last)
Input [0;32mIn [1][0m, in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0m [38;5;28;01mraise[39;00m [38;5;167;01mException[39;00m([38;5;124m"[39m[38;5;124mOops[39m[38;5;124m"[39m)

[0;31mException[0m: Oops


In [None]:
#|export
@patch
def cell(self:CaptureShell, cell, stdout=True, stderr=True):
    "Run `cell`, skipping if not code, and store outputs back in cell"
    if cell.cell_type!='code': return
    outs = self.run(cell.source)
    if outs:
        cell.outputs = outs
        for o in outs:
            if 'execution_count' in o: cell['execution_count'] = o['execution_count']

In [None]:
clean = Path('../tests/clean.ipynb')
nb = read_nb(clean)
c = nb.cells[1]
c

```json
{ 'cell_type': 'code',
  'execution_count': None,
  'id': 'b123d6d0',
  'idx_': 1,
  'metadata': {},
  'outputs': [],
  'source': 'print(1)\n2'}
```

In [None]:
s.cell(c)
c.outputs

[{'data': {'text/plain': ['2']},
  'metadata': {},
  'output_type': 'execute_result',
  'execution_count': 2},
 {'name': 'stdout', 'output_type': 'stream', 'text': ['1']}]

### NBs -

In [None]:
#|export
@patch
def run_all(self:CaptureShell, nb, exc_stop=False, preproc=noop, postproc=noop):
    "Run all cells in `nb`, stopping at first exception if `exc_stop`"
    for cell in nb.cells:
        if not preproc(cell):
            self.cell(cell)
            postproc(cell)
        if self.exc and exc_stop: raise self.exc[1] from None

In [None]:
nb.cells[2].outputs

(#0) []

In [None]:
s.run_all(nb)
nb.cells[2].outputs

(#0) []

With `exc_stop=False` (the default), execution continues after exceptions, and exception details are stored into the appropriate cell's output:

In [None]:
nb.cells[-1].source

'raise Exception("Oopsie!")'

In [None]:
nb.cells[-1].outputs

(#0) []

With `exc_stop=True` (the default), exceptions in a cell are raised and no further processing occurs:

In [None]:
try: s.run_all(nb, exc_stop=True)
except Exception as e: print(f"got exception: {e}")

We can pass a function to `preproc` to have it run on every cell. It can modify the cell as needed. If the function returns `True`, then that cell will not be executed. For instance, to skip the cell which raises an exception:

In [None]:
nb = read_nb(clean)
s.run_all(nb, preproc=lambda c: 'raise' in c.source)

This cell will contain no output, since it was skipped.

In [None]:
nb.cells[-1].outputs

(#0) []

You can also pass a function to `postproc` to modify a cell after it is executed.

In [None]:
#|export
@patch
def execute(self:CaptureShell, src, dest, exc_stop=False, preproc=noop, postproc=noop):
    "Execute notebook from `src` and save with outputs to `dest"
    nb = read_nb(src)
    self.run_all(nb, exc_stop=exc_stop, preproc=preproc, postproc=postproc)
    write_nb(nb, dest)

This is a shortcut for the combination of `read_nb`, `run_all`, and `write_nb`.

In [None]:
try:
    s.execute(clean, 'tmp.ipynb')
    print(read_nb('tmp.ipynb').cells[1].outputs)
finally: Path('tmp.ipynb').unlink()

[{'data': {'text/plain': ['2']}, 'execution_count': 10, 'metadata': {}, 'output_type': 'execute_result'}, {'name': 'stdout', 'output_type': 'stream', 'text': ['1']}]


## export -

In [None]:
#|hide
#|eval: false
from nbprocess.doclinks import nbprocess_export
nbprocess_export()