In [1]:
# default_exp util

# install

In [1]:
%pip install nbdev | grep -v 'already satisfied'

[0mNote: you may need to restart the kernel to use updated packages.


# python

In [3]:
#export

import uuid

def new_id():
    return uuid.uuid4().hex

def list_partition(lines, predicate):
    l1, l2 = [], []
    for line in lines:
        (l1, l2)[not predicate(line)].append(line)
    return l1, l2

def write_file(filename, text):
    with open(filename, 'w') as f:
        return f.write(text)

def read_file(filename):
    with open(filename, 'r') as f:
        return f.read()


In [4]:
#export

from time import sleep
from IPython.display import display, Javascript
import subprocess
import os
import uuid
import re

def _get_notebook_path_and_save():
    magic = new_id()
    print(magic)
    # saves it (ctrl+S)
    display(Javascript('IPython.notebook.save_checkpoint();'))
    nb_name = None
    while nb_name is None:
        try:
            sleep(0.1)
            nb_name = subprocess.check_output(
                f'grep -l {magic} *.ipynb', shell=True).decode().strip()
        except:
            pass
    return os.path.join(os.getcwd(), nb_name)

def get_notebook_name():
    ipynb_path = _get_notebook_path_and_save()
    return re.search(r'\# default_exp (\w+) *', read_file(ipynb_path)).group(1)

_notebook_name = None


In [5]:
_notebook_name = get_notebook_name()
_notebook_name

fb6090d7ea5442cf8ccf019f342ea338


<IPython.core.display.Javascript object>

'util'

In [6]:
#export

import subprocess

def run_shell(*args, **kwargs):
    return subprocess.run(*args, shell=True, capture_output=True, **kwargs)
    
def run(*args, **kwargs):
    shell = run_shell(*args, **kwargs)
    err = shell.stderr.decode()
    return f'run() error: {err}' if err != '' else shell.stdout.decode()


In [7]:
#export

from IPython.core.magic import register_cell_magic

_node_cache = ''

@register_cell_magic
def node(arg, cell, test=False):
    global _node_cache
    global _notebook_name

    ipython_node_path = '_ipython_node.ts'
    
    def split_imports(code): return list_partition(code.splitlines(), lambda line: line.startswith('import '))
    cache_imports, cache_exports = split_imports(_node_cache)
    cell_imports, cell_exports = split_imports(cell)
    cell_exports = '\n'.join(cell_exports).strip('\n').split('\n')

    if arg == 'run':
        ts_imports = cache_imports
        ipython_imports = cache_imports + cell_imports

        ts_exports = cache_exports
        ipython_exports = cache_exports + ([] if cell_exports == [''] else (['', '// cell', 'var run = async (...args) => {'] + cell_exports + ['}', f'await run("{arg}")']))
    elif arg == 'export':
        ts_imports = cache_imports + cell_imports
        ipython_imports = ts_imports

        ts_exports = cache_exports + ([] if cell_exports == [''] else (['', '// cell'] + cell_exports))
        ipython_exports = ts_exports

    def join_imports(imports, exports): 
        return ''.join([
            '\n'.join(list(dict.fromkeys(imports))),
            '' if len(imports) == 0 or len(exports) == 0 else '\n\n\n',
            '\n'.join(exports).strip('\n') + '\n'
        ])
    new_code_ts = join_imports(ts_imports, ts_exports)
    new_code_ipython = join_imports(ipython_imports, ipython_exports)

    write_file(ipython_node_path, new_code_ipython)

    if arg == 'run':
        result = run(f'NODE_NO_WARNINGS=1 node --loader ts-node/esm --es-module-specifier-resolution=node {ipython_node_path}').splitlines()
    else:
        result = []

    _node_cache = new_code_ts

    if test:
        return result, new_code_ts, new_code_ipython
    else:
        if _notebook_name is None:
            _notebook_name = get_notebook_name()

        ts_node_path = f'{_notebook_name}.ts'

        write_file(ts_node_path, new_code_ts)
        return result if result != [] else ts_node_path


In [8]:
_node_cache = ''

def test_cell_node(arg, cell, expected_ts, expected_ipython, expected_result):
    result, new_code_ts, new_code_ipython = node(arg, cell, test=True)
    if new_code_ts != expected_ts or new_code_ipython != expected_ipython or result != expected_result:
        print('\n' + '\n'.join(['\n|arg:', arg, '\n|cell:', cell, 
                     '\n|new_code_ts:', new_code_ts, '\n|expected_ts:', expected_ts, 
                     '\n|new_code_ipython:', new_code_ipython, '\n|expected_ipython:', expected_ipython, 
                     '\n|result:', str(result), '\n|expected_result:', str(expected_result)]))
    else:
        print('arg:', arg, 'result:', expected_result)

test_cell_node(
    'export',
    'import * as os from "os"',
    'import * as os from "os"\n',
    'import * as os from "os"\n',
    [])

test_cell_node(
    'export',
    'import * as os from "os"\nvar a = os.EOL',
    'import * as os from "os"\n\n\n// cell\nvar a = os.EOL\n',
    'import * as os from "os"\n\n\n// cell\nvar a = os.EOL\n',
    [])

test_cell_node(
    'export',
    'import * as fs from "fs"\nvar b = fs.constants.F_OK',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n',
    [])

test_cell_node(
    'run',
    'console.log(a)',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n\n// cell\nvar run = async (...args) => {\nconsole.log(a)\n}\nawait run("run")\n',
    ['', ''])

test_cell_node(
    'run',
    'console.log(b)',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n\n// cell\nvar run = async (...args) => {\nconsole.log(b)\n}\nawait run("run")\n',
    ['0'])

test_cell_node(
    'export',
    'import * as fs from "fs"\nvar c = fs.constants.X_OK',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n\n// cell\nvar c = fs.constants.X_OK\n',
    'import * as os from "os"\nimport * as fs from "fs"\n\n\n// cell\nvar a = os.EOL\n\n// cell\nvar b = fs.constants.F_OK\n\n// cell\nvar c = fs.constants.X_OK\n',
    [])

_node_cache = ''

arg: export result: []
arg: export result: []
arg: export result: []
arg: run result: ['', '']
arg: run result: ['0']
arg: export result: []


In [9]:
#export

from IPython.core.magic import register_cell_magic

def _get_spiral_cache_empty(): return {'spi': '', 'spir': '', '': ''}
_spiral_cache = _get_spiral_cache_empty()

@register_cell_magic
def spiral(arg, cell, test=False):
    global _spiral_cache
    global _notebook_name

    def split_imports(code): return list_partition(code.splitlines(), lambda line: line.startswith('open '))
    cache_imports, cache_exports = split_imports(_spiral_cache[arg])
    cell_imports, cell_exports = split_imports(cell)
    cell_exports = '\n'.join(cell_exports).strip('\n').split('\n')
    spi_imports = cache_imports + cell_imports
    spi_exports = cache_exports + ([] if cell_exports == [''] else (['', '// cell'] + cell_exports))

    def join_imports(imports, exports): 
        return ''.join([
            '\n'.join(list(dict.fromkeys(imports))),
            '' if len(imports) == 0 or len(exports) == 0 else '\n\n\n',
            '\n'.join(exports).strip('\n') + '\n'
        ])
    new_code_spi = join_imports(spi_imports, spi_exports)
    
    if arg in ['spi', 'spir']:
        if _notebook_name is None:
            _notebook_name = get_notebook_name()
        spi_path = f'{_notebook_name}.{arg}'
    else:
        spi_path = f'main.spi'
    
    _spiral_cache[arg] = new_code_spi

    if test:
        return new_code_spi
    else:
        write_file(spi_path, new_code_spi)
        return spi_path

@register_cell_magic
def spi(arg, cell, test=False):
    return spiral('spi', cell, test)

@register_cell_magic
def spir(arg, cell, test=False):
    return spiral('spir', cell, test)


In [10]:
_spiral_cache = _get_spiral_cache_empty()

def test_cell_spiral(arg, cell, expected_spi):
    new_code_spi = spiral(arg, cell, test=True)

    if new_code_spi != expected_spi:
        print('\n' + '\n'.join(['\n|arg:', arg, '\n|cell:', cell, 
                     '\n|new_code_spi:', new_code_spi, '\n|expected_spi:', expected_spi]))
    else:
        print('arg:', arg)

test_cell_spiral(
    'spi',
    'open a',
    'open a\n'
)

test_cell_spiral(
    'spi',
    'open a\ninl get1 () = 1i32',
    'open a\n\n\n// cell\ninl get1 () = 1i32\n'
)

test_cell_spiral(
    'spi',
    'open b\ninl get2 () = 2i32',
    'open a\nopen b\n\n\n// cell\ninl get1 () = 1i32\n\n// cell\ninl get2 () = 2i32\n'
)

test_cell_spiral(
    'spi',
    'open b\ninl get3 () = 3i32',
    'open a\nopen b\n\n\n// cell\ninl get1 () = 1i32\n\n// cell\ninl get2 () = 2i32\n\n// cell\ninl get3 () = 3i32\n'
)

_spiral_cache = _get_spiral_cache_empty()

arg: spi
arg: spi
arg: spi
arg: spi


In [11]:
%%spi

nominal _obj = $"obj"

inl test b = assert b "test"


'util.spi'

# node

In [12]:
%%node export

import * as fs from "fs"

var start = process.hrtime()

export var elapsed = (start: [number, number]) => process.hrtime(start)[1] / 1000000
export var logStep = (note: string) => {
    console.log(`${process.hrtime(start)[0]} s, ${elapsed(start).toFixed(3)} ms - ${note}`)
    start = process.hrtime()
}
export var sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))
export var waitFileChange = async (path: string) => {
    start = process.hrtime()
    const stat_mtime = fs.statSync(path).mtimeMs
    while (elapsed(start) < 30000
        && (fs.statSync(path).mtimeMs === stat_mtime
            || fs.readFileSync(path).length === 0)) {
        await sleep(50)
    }
}

'util.ts'

# py compile

In [None]:
!nbdev_build_lib