In [None]:
#|hide
#|default_exp process

# process
- A notebook processor

In [None]:
#|export
from nbprocess.read import *
from nbprocess.maker import *
from nbprocess.imports import *

from fastcore.script import *
from fastcore.imports import *
from fastcore.xtras import *

from collections import defaultdict
from pprint import pformat
from inspect import signature,Parameter
import ast,contextlib,copy

In [None]:
from fastcore.test import *
from pdb import set_trace
from importlib import reload
import shutil

Special comments at the start of a cell can be used to provide information to `nbprocess` about how to process a cell, so we need to be able to find the location of these comments.

In [None]:
minimal = read_nb('../tests/minimal.ipynb')

In [None]:
#|export
def _directive(s):
    s = (s.strip()[2:]).strip().split()
    if not s: return None
    direc,*args = s
    return direc,args

In [None]:
#|export
def extract_directives(cell, remove=True):
    "Take leading comment directives from lines of code in `ss`, remove `#|`, and split"
    ss = cell.source.splitlines(True)
    first_code = first(i for i,o in enumerate(ss) if not o.strip() or not re.match(r'\s*#\|', o))
    if not ss or first_code==0: return {}
    if remove: cell['source'] = ''.join(ss[first_code:])
    res = L(_directive(s) for s in ss[:first_code]).filter()
    return {k:v for k,v in res}

Comment directives start with `#`, followed by whitespace delimited tokens, which `extract_directives` extracts from the start of a cell, up until a blank line or a line containing something other than comments. The extracted lines are removed from the source.

In [None]:
exp  = AttrDict(source = """#|export module
#| hide
1+2
#bar""")
test_eq(extract_directives(exp), dict(export=['module'],hide=[]))
test_eq(exp.source, "1+2\n#bar")

In [None]:
#|export
def opt_set(var, newval):
    "newval if newval else var"
    return newval if newval else var

In [None]:
#|export
def instantiate(x):
    "Instantiate `x` if it's a type"
    return x() if isinstance(x,type) else x

def _mk_procs(procs): return L(procs).map(instantiate)

In [None]:
#|export
def _is_direc(f): return getattr(f, '__name__', '-')[-1]=='_'

In [None]:
#|export
class NBProcessor:
    "Process cells and nbdev comments in a notebook"
    def __init__(self, path=None, procs=None, preprocs=None, postprocs=None, nb=None, debug=False, rm_directives=True):
        self.nb = read_nb(path) if nb is None else nb
        self.procs,self.preprocs,self.postprocs = map(_mk_procs, (procs,preprocs,postprocs))
        self.debug,self.rm_directives = debug,rm_directives

    def _process_cell(self, cell):
        self.cell = cell
        for proc in self.procs:
            if cell.cell_type=='code':
                for cmd,args in cell.directives_.items():
                    self._process_comment(proc, cell, cmd, args)
                    if not hasattr(cell,'source'): return
            if callable(proc) and not _is_direc(proc): cell = opt_set(cell, proc(cell))
            if not hasattr(cell,'source'): return

    def _process_comment(self, proc, cell, cmd, args):
        if _is_direc(proc) and getattr(proc, '__name__', '-')[:-1]==cmd: f=proc
        else: f = getattr(proc, f'_{cmd}_', None)
        if not f: return
        if self.debug: print(cmd, args, f)
        return f(self, cell, *args)
        
    def process(self):
        "Process all cells with `process_cell`"
        for proc in self.preprocs:
            self.nb = opt_set(self.nb, proc(self.nb))
            for i,cell in enumerate(self.nb.cells): cell.idx_ = i
        for cell in self.nb.cells: cell.directives_ = extract_directives(cell, remove=self.rm_directives)
        for cell in self.nb.cells: self._process_cell(cell)
        for proc in self.postprocs: self.nb = opt_set(self.nb, proc(self.nb))
        self.nb.cells = [c for c in self.nb.cells if c and getattr(c,'source',None) is not None]

Cell processors can be callables (e.g regular functions), in which case they are called for every cell:

In [None]:
everything_fn = '../tests/01_everything.ipynb'

def print_execs(cell):
    if 'exec' in cell.source: print(cell.source)

NBProcessor(everything_fn, print_execs).process()

exec("o_y=1")
exec("p_y=1")
_all_ = [o_y, 'p_y']


Comment directives are put in a cell attribute `directive_` as a dictionary keyed by directive name:

In [None]:
def printme_func(cell):
    if 'printme' in cell.directives_: print(cell.directives_['printme'])

NBProcessor(everything_fn, printme_func).process()

['testing']


However, a more convenient way to handle comment directives is to use a *class* as a processor, and include a method in your class with the same name as your directive, surrounded by underscores:

In [None]:
class _PrintExample:
    def _printme_(self, nbp, cell, to_print): print(to_print)

NBProcessor(everything_fn, _PrintExample()).process()

testing


In the case that your processor supports just one comment directive, you can just use a regular function, with the same name as your directive, but with an underscore appended -- here `printme_` is identical to `_PrintExample` above:

In [None]:
def printme_(nbp, cell, to_print): print(to_print)

NBProcessor(everything_fn, printme_).process()

testing


## Export -

In [None]:
#|skip
basic_export_nb2('01_read.ipynb', 'read')
basic_export_nb2('02_maker.ipynb', 'maker')
basic_export_nb2('03_process.ipynb', 'process')

g = exec_new('import nbprocess.process')
assert hasattr(g['nbprocess'].process, 'NBProcessor')