In [None]:
#| default_exp core

# pshnb IPython magic
> Provides `psh` persistent bash magics in Jupyter and IPython

In [None]:
#| export
from fastcore.utils import *
import pexpect, re, os
from pexpect import TIMEOUT
from pathlib import Path
from getpass import getpass
from IPython.core.magic import register_cell_magic, no_var_expand
from IPython.display import display, Javascript
from IPython.paths import get_ipython_dir
from IPython.core.interactiveshell import InteractiveShell
from IPython.core.magic_arguments import magic_arguments, argument

In [None]:
__file__ = './00_core.ipynb'

In [None]:
env = dict(os.environ, TERM='dumb', PS1='$', PS2='$')
eshell = os.environ['SHELL']
sh = pexpect.spawn(eshell, encoding='utf-8', env=env)

In [None]:
env = dict(os.environ, TERM='dumb', PS1='', PS2='')
eshell = os.environ['SHELL']
sh = pexpect.spawn(eshell, encoding='utf-8', env=env)
sh.sendline('stty -echo')
sh.readline()
echo = os.urandom(8).hex()
echo_re = re.compile(fr'^{echo}\s*$', flags=re.MULTILINE)
sh.sendline(f'export PS1=""')
sh.sendline('set +o vi +o emacs')
sh.sendline('echo '+echo)
sh.expect(echo_re, timeout=2)

0

In [None]:
sh.sendline('ls | head -3')
sh.sendline('echo '+echo)
sh.expect(echo_re, timeout=2)
print(sh.before)

00_core.ipynb
CHANGELOG.bak
CHANGELOG.md



In [None]:
#| export
class ShellInterpreter:
    def __init__(self, debug=False, timeout=2, shell_path=None, sudo=False, dumb=False):
        self.debug,self.timeout = debug,timeout
        if shell_path is None: shell_path = os.environ.get('SHELL', '/bin/bash')
        if sudo: shell_path = 'sudo -i ' + shell_path
        env = dict(os.environ, TERM='dumb' if dumb else 'xterm')
        self.sh = pexpect.spawn(shell_path, encoding='utf-8', env=env)
        self.sh.sendline('stty -echo')
        self.sh.readline()
        self.echo = os.urandom(8).hex()
        self.echo_re = re.compile(fr'^{self.echo}\s*$', flags=re.MULTILINE)
        self.sh.sendline(f'export PS1=""')
        self.sh.sendline(f'export PS2=""')
        self.sh.sendline('set +o vi +o emacs')
        self.wait_echo()

    def wait_echo(self, timeout=None):
        self.sh.sendline('echo')
        self.sh.sendline('echo '+self.echo)
        self.sh.expect(self.echo_re, timeout=timeout)
        return self.sh.before.rstrip()

    def _ex(self, s, timeout=None):
        if timeout is None: timeout=self.timeout
        if self.debug: print('#', s)
        self.sh.sendline(s)
        res = self.wait_echo(timeout=timeout)
        return res
        
    def __call__(self, cmd, timeout=None):
        output = self._ex(cmd.rstrip(), timeout=timeout)
        return output.replace(cmd + '\r\n', '', 1).rstrip()

In [None]:
sh = ShellInterpreter()
print(sh('ls | head -3'))

00_core.ipynb
CHANGELOG.bak
CHANGELOG.md


In [None]:
sh = ShellInterpreter(sudo=True)

sh('cd')
print(sh('pwd'))
sh('cd ..')
print(sh('pwd'))
print(sh('whoami'))

/var/root
/var
root


In [None]:
#| export
def shell_replace(s, shell=None):
    "Replace `@{var}` refs in `s` with their variable values, if they exist"
    if not shell: shell = get_ipython()
    def f(m): return str(shell.user_ns.get(m[1], m[0]))
    return re.sub(r'\@{(\w+?)}', f, s)

In [None]:
b = 1

a = '''asdf
$@{b} @{aa}
fdsa'''

print(shell_replace(a))

asdf
$1 @{aa}
fdsa


In [None]:
#| export
class PshMagic:
    def __init__(self, shell, sudo=False, timeout=2, expand=True, o=None): store_attr()
    def reset(self): self.o = ShellInterpreter(sudo=self.sudo, timeout=self.timeout)
    def help (self): self.psh.parser.print_help()

    def _xpand(self, expand=False): self.expand = expand
    def _sudo(self, sudo=False):
        self.sudo = sudo
        self.o = None
    def _timeout(self, timeout=2):
        self.timeout = timeout
        self.o = None

    @magic_arguments()
    @argument('-h', '--help',      action='store_true', help='Show this help')
    @argument('-r', '--reset',     action='store_true', help='Reset the shell interpreter')
    @argument('-o', '--obj',       action='store_true', help='Return this magic object')
    @argument('-x', '--expand',    action='store_true', help='Enable variable expansion')
    @argument('-X', '--no-expand', action='store_true', help='Disable variable expansion')
    @argument('-s', '--sudo',      action='store_true', help='Enable sudo')
    @argument('-S', '--no-sudo',   action='store_true', help='Disable sudo')
    @argument('-t', '--timeout', type=int, help='Set timeout in seconds')
    @argument('command', nargs='*', help='The command to run')
    @no_var_expand
    def psh(self, line, cell=None):
        "Run line or cell in persistent shell"
        if not cell and not line: line = 'echo'
        if cell: cell = shell_replace(cell, self.shell)
        if line: line = shell_replace(line, self.shell)
        args = self.psh.parser.parse_args(line.split())
        if args.expand:    return self._xpand(True)
        if args.no_expand: return self._xpand(False)
        if args.sudo:      return self._sudo (True)
        if args.no_sudo:   return self._sudo (False)
        if args.timeout:   return self._timeout(args.timeout)
        if args.reset:     return self.reset()
        if args.help:      return self.help()
        if args.obj:       return self
        if args.command: cell = ' '.join(args.command)
        if not cell and line: cell=line
        disp = True
        if cell.endswith(';'): disp,cell = False,cell[:-1]
        if not self.o: self.reset()
        try: res = self.o(cell) or None
        except Exception as e:
            self.o = None
            raise e from None
        if disp and res: print(res)

In [None]:
#| export
def create_magic(shell=None):
    if not shell: shell = get_ipython()
    magic = PshMagic(shell)
    shell.register_magic_function(magic.psh, magic_name='psh', magic_kind='line_cell')

In [None]:
# Only required if you don't load the extension
create_magic()

In [None]:
%psh pwd

/Users/jhoward/Documents/GitHub/pshnb


In [None]:
%psh cd ..

In [None]:
%psh pwd

/Users/jhoward/Documents/GitHub


In [None]:
%%psh
cat > tmp << EOF
hi
there
EOF

In [None]:
%psh cat tmp

hi
there


In [None]:
%psh rm tmp

In [None]:
%psh ls | head -3

[34mContextKit[39;49m[0m
[34mFastHTML-Gallery[39;49m[0m
[34maimagic[39;49m[0m


In [None]:
n = 2

In [None]:
%psh echo @{n}

2


In [None]:
%psh ls | head -@{n}

[34mContextKit[39;49m[0m
[34mFastHTML-Gallery[39;49m[0m


In [None]:
%%psh
echo starting
(sleep 1; echo finished) &

starting
[1] 99418


In [None]:
%psh

finished

[1]+  Done                    ( sleep 1; echo finished )


In [None]:
%psh -h

::

  %psh [-h] [-r] [-o] [-x] [-X] [-s] [-S] [-t TIMEOUT] [command ...]

Run line or cell in persistent shell

positional arguments:
  command               The command to run

options:
  -h, --help            Show this help
  -r, --reset           Reset the shell interpreter
  -o, --obj             Return this magic object
  -x, --expand          Enable variable expansion
  -X, --no-expand       Disable variable expansion
  -s, --sudo            Enable sudo
  -S, --no-sudo         Disable sudo
  -t TIMEOUT, --timeout TIMEOUT
                        Set timeout in seconds


In [None]:
%psh pwd

/Users/jhoward/Documents/GitHub


In [None]:
%psh -r

In [None]:
%psh pwd

/Users/jhoward/Documents/GitHub/pshnb


In [None]:
%psh -s

In [None]:
%psh whoami

root


In [None]:
%psh -S

In [None]:
%psh whoami

jhoward


In [None]:
%psh -t 1

In [None]:
try: get_ipython().run_line_magic('psh', 'sleep 2')
except TIMEOUT: print("timed out")

timed out


In [None]:
#|export
def load_ipython_extension(ipython):
    "Required function for creating magic"
    create_magic(shell=ipython)

In [None]:
#| export
def create_ipython_config():
    "Called by `pshnb_install` to install magic"
    ipython_dir = Path(get_ipython_dir())
    cf = ipython_dir/'profile_default'/'ipython_config.py'
    cf.parent.mkdir(parents=True, exist_ok=True)
    if cf.exists() and 'pshnb' in cf.read_text(): return print('pshnb already installed!')
    with cf.open(mode='a') as f: f.write("\nc.InteractiveShellApp.extensions.append('pshnb.core')\n\n")
    print(f"Jupyter config updated at {cf}")

## Export -

In [None]:
#|hide
#|eval: false
from nbdev.doclinks import nbdev_export
nbdev_export()